diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go index e38ee2952f41a..a850df075b0f6 100644 --- a/cli/internal/cache/cache.go +++ b/cli/internal/cache/cache.go @@ -3,9 +3,10 @@ package cache import ( "fmt" - "sync" + "github.com/vercel/turborepo/cli/internal/config" "github.com/vercel/turborepo/cli/internal/ui" + "golang.org/x/sync/errgroup" ) // Cache is abstracted way to cache/fetch previously run tasks @@ -53,8 +54,7 @@ type cacheMultiplexer struct { } func (mplex cacheMultiplexer) Put(target string, key string, duration int, files []string) error { - mplex.storeUntil(target, key, duration, files, len(mplex.caches)) - return nil + return mplex.storeUntil(target, key, duration, files, len(mplex.caches)) } // storeUntil stores artifacts into higher priority caches than the given one. @@ -62,30 +62,36 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files // downloading from the RPC cache. // This is a little inefficient since we could write the file to plz-out then copy it to the dir cache, // but it's hard to fix that without breaking the cache abstraction. -func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) { +func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error { // Attempt to store on all caches simultaneously. - var wg sync.WaitGroup + g := &errgroup.Group{} for i, cache := range mplex.caches { if i == stopAt { break } - wg.Add(1) - go func(cache Cache) { - cache.Put(target, key, duration, outputGlobs) - wg.Done() - }(cache) + c := cache + g.Go(func() error { + return c.Put(target, key, duration, outputGlobs) + }) + } + + if err := g.Wait(); err != nil { + return err } - wg.Wait() + + return nil } func (mplex cacheMultiplexer) Fetch(target string, key string, files []string) (bool, []string, error) { // Retrieve from caches sequentially; if we did them simultaneously we could // easily write the same file from two goroutines at once. for i, cache := range mplex.caches { - if ok, actualFiles, _ := cache.Fetch(target, key, files); ok { - // Store this into other caches + if ok, actualFiles, err := cache.Fetch(target, key, files); ok { + // Store this into other caches. We can ignore errors here because we know + // we have previously successfully stored in a higher-priority cache, and so the overall + // result is a success at fetching. Storing in lower-priority caches is an optimization. mplex.storeUntil(target, key, 0, actualFiles, i) - return ok, actualFiles, nil + return ok, actualFiles, err } } return false, files, nil diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index 2ec1535672391..9fd835e987298 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -41,6 +41,7 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() + r, w := io.Pipe() go cache.write(w, hash, files) return cache.config.ApiClient.PutArtifact(hash, cache.config.TeamId, cache.config.TeamSlug, duration, r) diff --git a/cli/internal/client/client.go b/cli/internal/client/client.go index a5097d9a501b5..704d98b90fd05 100644 --- a/cli/internal/client/client.go +++ b/cli/internal/client/client.go @@ -1,7 +1,10 @@ package client import ( + "context" + "crypto/x509" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -9,6 +12,7 @@ import ( "net/url" "runtime" "strings" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -20,31 +24,99 @@ type ApiClient struct { baseUrl string Token string turboVersion string + // Number of failed requests before we stop trying to upload/download artifacts to the remote cache + maxRemoteFailCount uint64 + // Must be used via atomic package + currentFailCount uint64 // An http client HttpClient *retryablehttp.Client } +// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred +var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred") + func (api *ApiClient) SetToken(token string) { api.Token = token } // New creates a new ApiClient -func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient { - return &ApiClient{ - baseUrl: baseUrl, - turboVersion: turboVersion, +func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient { + client := &ApiClient{ + baseUrl: baseUrl, + turboVersion: turboVersion, + maxRemoteFailCount: maxRemoteFailCount, HttpClient: &retryablehttp.Client{ HTTPClient: &http.Client{ - Timeout: time.Duration(60 * time.Second), + Timeout: time.Duration(20 * time.Second), }, - RetryWaitMin: 10 * time.Second, - RetryWaitMax: 20 * time.Second, - RetryMax: 5, - CheckRetry: retryablehttp.DefaultRetryPolicy, + RetryWaitMin: 2 * time.Second, + RetryWaitMax: 10 * time.Second, + RetryMax: 2, Backoff: retryablehttp.DefaultBackoff, Logger: logger, }, } + client.HttpClient.CheckRetry = client.checkRetry + return client +} + +func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) { + if err != nil { + if errors.As(err, &x509.UnknownAuthorityError{}) { + // Don't retry if the error was due to TLS cert verification failure. + atomic.AddUint64(&c.currentFailCount, 1) + return false, err + } + atomic.AddUint64(&c.currentFailCount, 1) + return true, nil + } + + // 429 Too Many Requests is recoverable. Sometimes the server puts + // a Retry-After response header to indicate when the server is + // available to start processing request from client. + if resp.StatusCode == http.StatusTooManyRequests { + atomic.AddUint64(&c.currentFailCount, 1) + return true, nil + } + + // Check the response code. We retry on 500-range responses to allow + // the server time to recover, as 500's are typically not permanent + // errors and may relate to outages on the server side. This will catch + // invalid response codes as well, like 0 and 999. + if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) { + atomic.AddUint64(&c.currentFailCount, 1) + return true, fmt.Errorf("unexpected HTTP status %s", resp.Status) + } + + return false, fmt.Errorf("unexpected HTTP status %s", resp.Status) +} + +func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + atomic.AddUint64(&c.currentFailCount, 1) + return false, ctx.Err() + } + + // we're squashing the error from the request and substituting any error that might come + // from our retry policy. + shouldRetry, err := c.retryCachePolicy(resp, err) + if shouldRetry { + // Our policy says it's ok to retry, but we need to check the failure count + if retryErr := c.okToRequest(); retryErr != nil { + return false, retryErr + } + } + return shouldRetry, err +} + +// okToRequest returns nil if it's ok to make a request, and returns the error to +// return to the caller if a request is not allowed +func (c *ApiClient) okToRequest() error { + if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount { + return nil + } + return ErrTooManyFailures } func (c *ApiClient) makeUrl(endpoint string) string { @@ -56,6 +128,9 @@ func (c *ApiClient) UserAgent() string { } func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error { + if err := c.okToRequest(); err != nil { + return err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -63,7 +138,12 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio if slug != "" { params.Add("slug", slug) } - req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), rawBody) + // only add a ? if it's actually needed (makes logging cleaner) + encoded := params.Encode() + if encoded != "" { + encoded = "?" + encoded + } + req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+encoded), rawBody) req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("x-artifact-duration", fmt.Sprintf("%v", duration)) req.Header.Set("Authorization", "Bearer "+c.Token) @@ -71,6 +151,7 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio if err != nil { return fmt.Errorf("[WARNING] Invalid cache URL: %w", err) } + if resp, err := c.HttpClient.Do(req); err != nil { return fmt.Errorf("failed to store files in HTTP cache: %w", err) } else { @@ -80,6 +161,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio } func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) { + if err := c.okToRequest(); err != nil { + return nil, err + } params := url.Values{} if teamId != "" && strings.HasPrefix(teamId, "team_") { params.Add("teamId", teamId) @@ -87,12 +171,18 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo if slug != "" { params.Add("slug", slug) } - req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), nil) + // only add a ? if it's actually needed (makes logging cleaner) + encoded := params.Encode() + if encoded != "" { + encoded = "?" + encoded + } + req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+encoded), nil) req.Header.Set("Authorization", "Bearer "+c.Token) req.Header.Set("User-Agent", c.UserAgent()) if err != nil { - return nil, fmt.Errorf("[WARNING] Invalid cache URL: %w", err) + return nil, fmt.Errorf("invalid cache URL: %w", err) } + return c.HttpClient.Do(req) } diff --git a/cli/internal/config/config.go b/cli/internal/config/config.go index 74768dca95d9c..5241a2cee09b6 100644 --- a/cli/internal/config/config.go +++ b/cli/internal/config/config.go @@ -9,6 +9,7 @@ import ( "path/filepath" "runtime" "strings" + "github.com/vercel/turborepo/cli/internal/client" hclog "github.com/hashicorp/go-hclog" @@ -53,8 +54,6 @@ type CacheConfig struct { Workers int // Cache directory Dir string - // HTTP URI of the cache - Url string } // ParseAndValidate parses the cmd line flags / env vars, and verifies that all required @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config, Output: output, }) - apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion) + maxRemoteFailCount := 3 + apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount)) c = &Config{ Logger: logger,