From e7ed79bc7c113efe20c60b93cf513bcb8114ab00 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 18 Feb 2022 10:48:04 -0800 Subject: [PATCH] Apply review suggestions to remote failure PR --- cli/internal/cache/cache.go | 7 +-- cli/internal/cache/cache_http.go | 6 -- cli/internal/client/client.go | 100 ++++++++++++++++++------------- cli/internal/config/config.go | 11 ++-- 4 files changed, 67 insertions(+), 57 deletions(-) diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index c1cabfe8aa847..ba92f860e9329 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -3,11 +3,10 @@ package cache import ( "fmt" - "sync" - "golang.org/x/sync/errgroup" + "github.com/vercel/turborepo/cli/internal/config" "github.com/vercel/turborepo/cli/internal/ui" - + "golang.org/x/sync/errgroup" ) // Cache is abstracted way to cache/fetch previously run tasks @@ -65,7 +64,7 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files // but it's hard to fix that without breaking the cache abstraction. func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error { // Attempt to store on all caches simultaneously. - g := new(errgroup.Group) + g := &errgroup.Group{} for i, cache := range mplex.caches { if i == stopAt { break diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index b3c2022953d4f..9fd835e987298 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -41,9 +41,6 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() - if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { - return fmt.Errorf("skipping uploading artifacts to HTTP cache: too many failures") - } r, w := io.Pipe() go cache.write(w, hash, files) @@ -106,9 +103,6 @@ func (cache *httpCache) storeFile(tw *tar.Writer, name string) error { func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (bool, []string, error) { cache.requestLimiter.acquire() defer cache.requestLimiter.release() - if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { - return false, nil, fmt.Errorf("skipping downloading artifacts to HTTP cache: too many past failures") - } m, files, err := cache.retrieve(key) if err != nil { return false, files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index 8f60f9accbe90..704d98b90fd05 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -1,7 +1,10 @@ package client import ( + "context" + "crypto/x509" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -18,24 +21,30 @@ import ( type ApiClient struct { // The api's base URL - baseUrl string - Token string - turboVersion string - CurrentFailCount uint64 + baseUrl string + Token string + turboVersion string + // Number of failed requests before we stop trying to upload/download artifacts to the remote cache + maxRemoteFailCount uint64 + // Must be used via atomic package + currentFailCount uint64 // An http client HttpClient *retryablehttp.Client } +// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred +var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred") + func (api *ApiClient) SetToken(token string) { api.Token = token } // New creates a new ApiClient -func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient { +func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient { client := &ApiClient{ - baseUrl: baseUrl, - turboVersion: turboVersion, - CurrentFailCount: 0, + baseUrl: baseUrl, + turboVersion: turboVersion, + maxRemoteFailCount: maxRemoteFailCount, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ Timeout: time.Duration(20 * time.Second), @@ -47,19 +56,18 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli Logger: logger, }, } + client.HttpClient.CheckRetry = client.checkRetry return client } -func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { +func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { if err != nil { - if v, ok := err.(*url.Error); ok { + if errors.As(err, &x509.UnknownAuthorityError{}) { // Don't retry if the error was due to TLS cert verification failure. - if _, ok := v.Err.(x509.UnknownAuthorityError); ok { - atomic.AddUint64(&client.CurrentFailCount, 1) - return false, v - } + atomic.AddUint64(&c.currentFailCount, 1) + return false, err } - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, nil } @@ -67,7 +75,7 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, // a Retry-After response header to indicate when the server is // available to start processing request from client. if resp.StatusCode == http.StatusTooManyRequests { - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, nil } @@ -76,13 +84,41 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, // errors and may relate to outages on the server side. This will catch // invalid response codes as well, like 0 and 999. if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) { - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, fmt.Errorf("unexpected HTTP status %s", resp.Status) } return false, fmt.Errorf("unexpected HTTP status %s", resp.Status) } +func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + atomic.AddUint64(&c.currentFailCount, 1) + return false, ctx.Err() + } + + // we're squashing the error from the request and substituting any error that might come + // from our retry policy. + shouldRetry, err := c.retryCachePolicy(resp, err) + if shouldRetry { + // Our policy says it's ok to retry, but we need to check the failure count + if retryErr := c.okToRequest(); retryErr != nil { + return false, retryErr + } + } + return shouldRetry, err +} + +// okToRequest returns nil if it's ok to make a request, and returns the error to +// return to the caller if a request is not allowed +func (c *ApiClient) okToRequest() error { + if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount { + return nil + } + return ErrTooManyFailures +} + func (c *ApiClient) makeUrl(endpoint string) string { return fmt.Sprintf("%v%v", c.baseUrl, endpoint) } @@ -92,6 +128,9 @@ func (c *ApiClient) UserAgent() string { } func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error { + if err := c.okToRequest(); err != nil { + return err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -113,18 +152,6 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio return fmt.Errorf("[WARNING] Invalid cache URL: %w", err) } - c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - // do not retry on context.Canceled or context.DeadlineExceeded - if ctx.Err() != nil { - c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) - return false, ctx.Err() - } - - // don't propagate other errors - shouldRetry, err := c.retryCachePolicy(resp, err) - return shouldRetry, err - } - if resp, err := c.HttpClient.Do(req); err != nil { return fmt.Errorf("failed to store files in HTTP cache: %w", err) } else { @@ -134,6 +161,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio } func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) { + if err := c.okToRequest(); err != nil { + return nil, err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -153,18 +183,6 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo return nil, fmt.Errorf("invalid cache URL: %w", err) } - c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - // do not retry on context.Canceled or context.DeadlineExceeded - if ctx.Err() != nil { - c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) - return false, ctx.Err() - } - - // don't propagate other errors - shouldRetry, err := c.retryCachePolicy(resp, err) - return shouldRetry, err - } - return c.HttpClient.Do(req) } diff --git a/cli/internal/config/config.go b/cli/internal/config/config.go index 45bd5964b8ca3..5241a2cee09b6 100644 --- a/cli/internal/config/config.go +++ b/cli/internal/config/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "runtime" "strings" + "github.com/vercel/turborepo/cli/internal/client" hclog "github.com/hashicorp/go-hclog" @@ -49,8 +50,6 @@ type Config struct { // CacheConfig type CacheConfig struct { - // Number of failed requests before we stop trying to upload/download artifacts to the remote cache - MaxRemoteFailCount uint64 // Number of async workers Workers int // Cache directory @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, Output: output, }) - apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion) + maxRemoteFailCount := 3 + apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount)) c = &Config{ Logger: logger, @@ -174,9 +174,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, ApiClient: apiClient, TurboVersion: turboVersion, Cache: &CacheConfig{ - MaxRemoteFailCount: 3, - Workers: runtime.NumCPU() + 2, - Dir: filepath.Join("node_modules", ".cache", "turbo"), + Workers: runtime.NumCPU() + 2, + Dir: filepath.Join("node_modules", ".cache", "turbo"), }, }