From 384b7d8c8905e5da0c4d8113added495051f332d Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Tue, 11 Jan 2022 10:58:27 -0500 Subject: [PATCH 1/6] Stop hiting remote cache after a few failures --- cli/internal/client/client.go | 65 ++++++++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index ad809065d6a8d..f5dc861bd1635 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -11,6 +11,7 @@ import ( "net/url" "runtime" "strings" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -19,9 +20,11 @@ import ( type ApiClient struct { // The api's base URL - baseUrl string - Token string - turboVersion string + baseUrl string + Token string + turboVersion string + currentFailCount uint64 + maxFailCount uint64 // An http client HttpClient *retryablehttp.Client } @@ -32,21 +35,41 @@ func (api *ApiClient) SetToken(token string) { // New creates a new ApiClient func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient { - return &ApiClient{ - baseUrl: baseUrl, - turboVersion: turboVersion, + client := &ApiClient{ + baseUrl: baseUrl, + turboVersion: turboVersion, + currentFailCount: 0, + maxFailCount: 2, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ - Timeout: time.Duration(60 * time.Second), + Timeout: time.Duration(4 * time.Second), }, - RetryWaitMin: 10 * time.Second, - RetryWaitMax: 20 * time.Second, + RetryWaitMin: 5 * time.Second, + RetryWaitMax: 10 * time.Second, RetryMax: 5, - CheckRetry: retryablehttp.DefaultRetryPolicy, Backoff: retryablehttp.DefaultBackoff, Logger: logger, }, } + + client.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + return false, ctx.Err() + } + + if err != nil { + atomic.AddUint64(&client.currentFailCount, 1) + return false, err + } + if resp.StatusCode == http.StatusInternalServerError { + atomic.AddUint64(&client.currentFailCount, 1) + return false, err + } + return retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) + } + + return client } // DeviceToken is an OAuth 2.0 Device Flow token @@ -72,6 +95,9 @@ func (c *ApiClient) UserAgent() string { } func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error { + if c.currentFailCount >= c.maxFailCount { + return fmt.Errorf("too many failures, not even going to try") + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -79,7 +105,12 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio if slug != "" { params.Add("slug", slug) } - req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), rawBody) + // only add a ? if it's actually needed (makes logging cleaner) + encoded := params.Encode() + if encoded != "" { + encoded = "?" + encoded + } + req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+encoded), rawBody) req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("x-artifact-duration", fmt.Sprintf("%v", duration)) req.Header.Set("Authorization", "Bearer "+c.Token) @@ -96,6 +127,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio } func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) { + if c.currentFailCount >= c.maxFailCount { + return nil, fmt.Errorf("too many failures, not even going to try") + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -103,11 +137,16 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo if slug != "" { params.Add("slug", slug) } - req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), nil) + // only add a ? if it's actually needed (makes logging cleaner) + encoded := params.Encode() + if encoded != "" { + encoded = "?" + encoded + } + req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+encoded), nil) req.Header.Set("Authorization", "Bearer "+c.Token) req.Header.Set("User-Agent", c.UserAgent()) if err != nil { - return nil, fmt.Errorf("[WARNING] Invalid cache URL: %w", err) + return nil, fmt.Errorf("invalid cache URL: %w", err) } return c.HttpClient.Do(req) } From e336d081de596f7d4286d17e2f6bbd581f86b23c Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Tue, 11 Jan 2022 14:04:17 -0500 Subject: [PATCH 2/6] Lift to cache --- cli/internal/cache/cache_http.go | 7 +++ cli/internal/client/client.go | 82 ++++++++++++++++++++++---------- cli/internal/config/config.go | 9 ++-- 3 files changed, 70 insertions(+), 28 deletions(-) diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index 11252f563ab25..18dc00e81eb66 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -41,6 +41,10 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() + if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { + return fmt.Errorf("skipping uploading artifacts to HTTP cache: too many failures") + } + r, w := io.Pipe() go cache.write(w, hash, files) return cache.config.ApiClient.PutArtifact(hash, cache.config.TeamId, cache.config.TeamSlug, duration, r) @@ -102,6 +106,9 @@ func (cache *httpCache) storeFile(tw *tar.Writer, name string) error { func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (bool, []string, error) { cache.requestLimiter.acquire() defer cache.requestLimiter.release() + if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { + return false, nil, fmt.Errorf("skipping downloading artifacts to HTTP cache: too many past failures") + } m, files, err := cache.retrieve(key) if err != nil { return false, files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index f5dc861bd1635..10c4fc12fe5fa 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -23,8 +23,7 @@ type ApiClient struct { baseUrl string Token string turboVersion string - currentFailCount uint64 - maxFailCount uint64 + CurrentFailCount uint64 // An http client HttpClient *retryablehttp.Client } @@ -38,8 +37,7 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli client := &ApiClient{ baseUrl: baseUrl, turboVersion: turboVersion, - currentFailCount: 0, - maxFailCount: 2, + CurrentFailCount: 0, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ Timeout: time.Duration(4 * time.Second), @@ -51,25 +49,41 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli Logger: logger, }, } + return client +} - client.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - // do not retry on context.Canceled or context.DeadlineExceeded - if ctx.Err() != nil { - return false, ctx.Err() +func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { + if err != nil { + if v, ok := err.(*url.Error); ok { + // Don't retry if the error was due to TLS cert verification failure. + if _, ok := v.Err.(x509.UnknownAuthorityError); ok { + atomic.AddUint64(&client.CurrentFailCount, 1) + return false, v + } } - if err != nil { - atomic.AddUint64(&client.currentFailCount, 1) - return false, err - } - if resp.StatusCode == http.StatusInternalServerError { - atomic.AddUint64(&client.currentFailCount, 1) - return false, err - } - return retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) + atomic.AddUint64(&client.CurrentFailCount, 1) + return true, nil } - return client + // 429 Too Many Requests is recoverable. Sometimes the server puts + // a Retry-After response header to indicate when the server is + // available to start processing request from client. + if resp.StatusCode == http.StatusTooManyRequests { + atomic.AddUint64(&client.CurrentFailCount, 1) + return true, nil + } + + // Check the response code. We retry on 500-range responses to allow + // the server time to recover, as 500's are typically not permanent + // errors and may relate to outages on the server side. This will catch + // invalid response codes as well, like 0 and 999. + if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) { + atomic.AddUint64(&client.CurrentFailCount, 1) + return true, fmt.Errorf("unexpected HTTP status %s", resp.Status) + } + + return false, nil } // DeviceToken is an OAuth 2.0 Device Flow token @@ -95,9 +109,6 @@ func (c *ApiClient) UserAgent() string { } func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error { - if c.currentFailCount >= c.maxFailCount { - return fmt.Errorf("too many failures, not even going to try") - } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -118,6 +129,19 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio if err != nil { return fmt.Errorf("[WARNING] Invalid cache URL: %w", err) } + + c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) + return false, ctx.Err() + } + + // don't propagate other errors + shouldRetry, err := c.retryCachePolicy(resp, err) + return shouldRetry, err + } + if resp, err := c.HttpClient.Do(req); err != nil { return fmt.Errorf("failed to store files in HTTP cache: %w", err) } else { @@ -127,9 +151,6 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio } func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) { - if c.currentFailCount >= c.maxFailCount { - return nil, fmt.Errorf("too many failures, not even going to try") - } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -148,6 +169,19 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo if err != nil { return nil, fmt.Errorf("invalid cache URL: %w", err) } + + c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) + return false, ctx.Err() + } + + // don't propagate other errors + shouldRetry, err := c.retryCachePolicy(resp, err) + return shouldRetry, err + } + return c.HttpClient.Do(req) } diff --git a/cli/internal/config/config.go b/cli/internal/config/config.go index c62c6145e5886..93d108b9a358c 100644 --- a/cli/internal/config/config.go +++ b/cli/internal/config/config.go @@ -49,12 +49,12 @@ type Config struct { // CacheConfig type CacheConfig struct { + // Number of failed requests before we stop trying to upload/download artifacts to the remote cache + MaxRemoteFailCount uint64 // Number of async workers Workers int // Cache directory Dir string - // HTTP URI of the cache - Url string } // ParseAndValidate parses the cmd line flags / env vars, and verifies that all required @@ -182,8 +182,9 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, ApiClient: apiClient, TurboVersion: turboVersion, Cache: &CacheConfig{ - Workers: runtime.NumCPU() + 2, - Dir: filepath.Join("node_modules", ".cache", "turbo"), + MaxRemoteFailCount: 3, + Workers: runtime.NumCPU() + 2, + Dir: filepath.Join("node_modules", ".cache", "turbo"), }, } From d51e2a00c899e1851d51ab581bdf5c5cb69df43f Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Wed, 12 Jan 2022 09:34:59 -0500 Subject: [PATCH 3/6] Better retry defaults --- cli/internal/client/client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index 10c4fc12fe5fa..8c9884607ab2c 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -40,11 +40,11 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli CurrentFailCount: 0, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ - Timeout: time.Duration(4 * time.Second), + Timeout: time.Duration(20 * time.Second), }, - RetryWaitMin: 5 * time.Second, + RetryWaitMin: 2 * time.Second, RetryWaitMax: 10 * time.Second, - RetryMax: 5, + RetryMax: 2, Backoff: retryablehttp.DefaultBackoff, Logger: logger, }, @@ -61,7 +61,6 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, return false, v } } - atomic.AddUint64(&client.CurrentFailCount, 1) return true, nil } @@ -83,7 +82,7 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, return true, fmt.Errorf("unexpected HTTP status %s", resp.Status) } - return false, nil + return false, fmt.Errorf("unexpected HTTP status %s", resp.Status) } // DeviceToken is an OAuth 2.0 Device Flow token From 7d5ec1a031caa430f8d6a35b2ee2a18b5c0a40b9 Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Wed, 12 Jan 2022 09:35:23 -0500 Subject: [PATCH 4/6] Switch cache to errgroup --- cli/internal/cache/cache.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index 7c56dcbf80264..245357a7f848a 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -3,9 +3,10 @@ package cache import ( "fmt" - "sync" "turbo/internal/config" "turbo/internal/ui" + + "golang.org/x/sync/errgroup" ) // Cache is abstracted way to cache/fetch previously run tasks @@ -53,8 +54,7 @@ type cacheMultiplexer struct { } func (mplex cacheMultiplexer) Put(target string, key string, duration int, files []string) error { - mplex.storeUntil(target, key, duration, files, len(mplex.caches)) - return nil + return mplex.storeUntil(target, key, duration, files, len(mplex.caches)) } // storeUntil stores artifacts into higher priority caches than the given one. @@ -62,30 +62,34 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files // downloading from the RPC cache. // This is a little inefficient since we could write the file to plz-out then copy it to the dir cache, // but it's hard to fix that without breaking the cache abstraction. -func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) { +func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error { // Attempt to store on all caches simultaneously. - var wg sync.WaitGroup + g := new(errgroup.Group) for i, cache := range mplex.caches { if i == stopAt { break } - wg.Add(1) - go func(cache Cache) { - cache.Put(target, key, duration, outputGlobs) - wg.Done() - }(cache) + c := cache + g.Go(func() error { + return c.Put(target, key, duration, outputGlobs) + }) + } + + if err := g.Wait(); err != nil { + return err } - wg.Wait() + + return nil } func (mplex cacheMultiplexer) Fetch(target string, key string, files []string) (bool, []string, error) { // Retrieve from caches sequentially; if we did them simultaneously we could // easily write the same file from two goroutines at once. for i, cache := range mplex.caches { - if ok, actualFiles, _ := cache.Fetch(target, key, files); ok { + if ok, actualFiles, err := cache.Fetch(target, key, files); ok { // Store this into other caches mplex.storeUntil(target, key, 0, actualFiles, i) - return ok, actualFiles, nil + return ok, actualFiles, err } } return false, files, nil From e7ed79bc7c113efe20c60b93cf513bcb8114ab00 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 18 Feb 2022 10:48:04 -0800 Subject: [PATCH 5/6] Apply review suggestions to remote failure PR --- cli/internal/cache/cache.go | 7 +-- cli/internal/cache/cache_http.go | 6 -- cli/internal/client/client.go | 100 ++++++++++++++++++------------- cli/internal/config/config.go | 11 ++-- 4 files changed, 67 insertions(+), 57 deletions(-) diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index c1cabfe8aa847..ba92f860e9329 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -3,11 +3,10 @@ package cache import ( "fmt" - "sync" - "golang.org/x/sync/errgroup" + "github.com/vercel/turborepo/cli/internal/config" "github.com/vercel/turborepo/cli/internal/ui" - + "golang.org/x/sync/errgroup" ) // Cache is abstracted way to cache/fetch previously run tasks @@ -65,7 +64,7 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files // but it's hard to fix that without breaking the cache abstraction. func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error { // Attempt to store on all caches simultaneously. - g := new(errgroup.Group) + g := &errgroup.Group{} for i, cache := range mplex.caches { if i == stopAt { break diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index b3c2022953d4f..9fd835e987298 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -41,9 +41,6 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() - if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { - return fmt.Errorf("skipping uploading artifacts to HTTP cache: too many failures") - } r, w := io.Pipe() go cache.write(w, hash, files) @@ -106,9 +103,6 @@ func (cache *httpCache) storeFile(tw *tar.Writer, name string) error { func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (bool, []string, error) { cache.requestLimiter.acquire() defer cache.requestLimiter.release() - if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount { - return false, nil, fmt.Errorf("skipping downloading artifacts to HTTP cache: too many past failures") - } m, files, err := cache.retrieve(key) if err != nil { return false, files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index 8f60f9accbe90..704d98b90fd05 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -1,7 +1,10 @@ package client import ( + "context" + "crypto/x509" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -18,24 +21,30 @@ import ( type ApiClient struct { // The api's base URL - baseUrl string - Token string - turboVersion string - CurrentFailCount uint64 + baseUrl string + Token string + turboVersion string + // Number of failed requests before we stop trying to upload/download artifacts to the remote cache + maxRemoteFailCount uint64 + // Must be used via atomic package + currentFailCount uint64 // An http client HttpClient *retryablehttp.Client } +// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred +var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred") + func (api *ApiClient) SetToken(token string) { api.Token = token } // New creates a new ApiClient -func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient { +func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient { client := &ApiClient{ - baseUrl: baseUrl, - turboVersion: turboVersion, - CurrentFailCount: 0, + baseUrl: baseUrl, + turboVersion: turboVersion, + maxRemoteFailCount: maxRemoteFailCount, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ Timeout: time.Duration(20 * time.Second), @@ -47,19 +56,18 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli Logger: logger, }, } + client.HttpClient.CheckRetry = client.checkRetry return client } -func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { +func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { if err != nil { - if v, ok := err.(*url.Error); ok { + if errors.As(err, &x509.UnknownAuthorityError{}) { // Don't retry if the error was due to TLS cert verification failure. - if _, ok := v.Err.(x509.UnknownAuthorityError); ok { - atomic.AddUint64(&client.CurrentFailCount, 1) - return false, v - } + atomic.AddUint64(&c.currentFailCount, 1) + return false, err } - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, nil } @@ -67,7 +75,7 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, // a Retry-After response header to indicate when the server is // available to start processing request from client. if resp.StatusCode == http.StatusTooManyRequests { - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, nil } @@ -76,13 +84,41 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, // errors and may relate to outages on the server side. This will catch // invalid response codes as well, like 0 and 999. if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) { - atomic.AddUint64(&client.CurrentFailCount, 1) + atomic.AddUint64(&c.currentFailCount, 1) return true, fmt.Errorf("unexpected HTTP status %s", resp.Status) } return false, fmt.Errorf("unexpected HTTP status %s", resp.Status) } +func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + atomic.AddUint64(&c.currentFailCount, 1) + return false, ctx.Err() + } + + // we're squashing the error from the request and substituting any error that might come + // from our retry policy. + shouldRetry, err := c.retryCachePolicy(resp, err) + if shouldRetry { + // Our policy says it's ok to retry, but we need to check the failure count + if retryErr := c.okToRequest(); retryErr != nil { + return false, retryErr + } + } + return shouldRetry, err +} + +// okToRequest returns nil if it's ok to make a request, and returns the error to +// return to the caller if a request is not allowed +func (c *ApiClient) okToRequest() error { + if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount { + return nil + } + return ErrTooManyFailures +} + func (c *ApiClient) makeUrl(endpoint string) string { return fmt.Sprintf("%v%v", c.baseUrl, endpoint) } @@ -92,6 +128,9 @@ func (c *ApiClient) UserAgent() string { } func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error { + if err := c.okToRequest(); err != nil { + return err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -113,18 +152,6 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio return fmt.Errorf("[WARNING] Invalid cache URL: %w", err) } - c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - // do not retry on context.Canceled or context.DeadlineExceeded - if ctx.Err() != nil { - c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) - return false, ctx.Err() - } - - // don't propagate other errors - shouldRetry, err := c.retryCachePolicy(resp, err) - return shouldRetry, err - } - if resp, err := c.HttpClient.Do(req); err != nil { return fmt.Errorf("failed to store files in HTTP cache: %w", err) } else { @@ -134,6 +161,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio } func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) { + if err := c.okToRequest(); err != nil { + return nil, err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -153,18 +183,6 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo return nil, fmt.Errorf("invalid cache URL: %w", err) } - c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { - // do not retry on context.Canceled or context.DeadlineExceeded - if ctx.Err() != nil { - c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1) - return false, ctx.Err() - } - - // don't propagate other errors - shouldRetry, err := c.retryCachePolicy(resp, err) - return shouldRetry, err - } - return c.HttpClient.Do(req) } diff --git a/cli/internal/config/config.go b/cli/internal/config/config.go index 45bd5964b8ca3..5241a2cee09b6 100644 --- a/cli/internal/config/config.go +++ b/cli/internal/config/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "runtime" "strings" + "github.com/vercel/turborepo/cli/internal/client" hclog "github.com/hashicorp/go-hclog" @@ -49,8 +50,6 @@ type Config struct { // CacheConfig type CacheConfig struct { - // Number of failed requests before we stop trying to upload/download artifacts to the remote cache - MaxRemoteFailCount uint64 // Number of async workers Workers int // Cache directory @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, Output: output, }) - apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion) + maxRemoteFailCount := 3 + apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount)) c = &Config{ Logger: logger, @@ -174,9 +174,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, ApiClient: apiClient, TurboVersion: turboVersion, Cache: &CacheConfig{ - MaxRemoteFailCount: 3, - Workers: runtime.NumCPU() + 2, - Dir: filepath.Join("node_modules", ".cache", "turbo"), + Workers: runtime.NumCPU() + 2, + Dir: filepath.Join("node_modules", ".cache", "turbo"), }, } From b4c4af60f0422b04d863b0185fb8566a56db15ab Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 18 Feb 2022 11:25:28 -0800 Subject: [PATCH 6/6] Add comment re: ignoring error --- cli/internal/cache/cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index ba92f860e9329..a850df075b0f6 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -87,7 +87,9 @@ func (mplex cacheMultiplexer) Fetch(target string, key string, files []string) ( // easily write the same file from two goroutines at once. for i, cache := range mplex.caches { if ok, actualFiles, err := cache.Fetch(target, key, files); ok { - // Store this into other caches + // Store this into other caches. We can ignore errors here because we know + // we have previously successfully stored in a higher-priority cache, and so the overall + // result is a success at fetching. Storing in lower-priority caches is an optimization. mplex.storeUntil(target, key, 0, actualFiles, i) return ok, actualFiles, err }