这是indexloc提供的服务,不要输入任何密码
Skip to content

Stop hitting remote cache after a few failures #545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 18, 2022
Merged
34 changes: 20 additions & 14 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cache

import (
"fmt"
"sync"

"github.com/vercel/turborepo/cli/internal/config"
"github.com/vercel/turborepo/cli/internal/ui"
"golang.org/x/sync/errgroup"
)

// Cache is abstracted way to cache/fetch previously run tasks
Expand Down Expand Up @@ -53,39 +54,44 @@ type cacheMultiplexer struct {
}

func (mplex cacheMultiplexer) Put(target string, key string, duration int, files []string) error {
mplex.storeUntil(target, key, duration, files, len(mplex.caches))
return nil
return mplex.storeUntil(target, key, duration, files, len(mplex.caches))
}

// storeUntil stores artifacts into higher priority caches than the given one.
// Used after artifact retrieval to ensure we have them in eg. the directory cache after
// downloading from the RPC cache.
// This is a little inefficient since we could write the file to plz-out then copy it to the dir cache,
// but it's hard to fix that without breaking the cache abstraction.
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) {
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error {
// Attempt to store on all caches simultaneously.
var wg sync.WaitGroup
g := &errgroup.Group{}
for i, cache := range mplex.caches {
if i == stopAt {
break
}
wg.Add(1)
go func(cache Cache) {
cache.Put(target, key, duration, outputGlobs)
wg.Done()
}(cache)
c := cache
g.Go(func() error {
return c.Put(target, key, duration, outputGlobs)
})
}

if err := g.Wait(); err != nil {
return err
}
wg.Wait()

return nil
}

func (mplex cacheMultiplexer) Fetch(target string, key string, files []string) (bool, []string, error) {
// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range mplex.caches {
if ok, actualFiles, _ := cache.Fetch(target, key, files); ok {
// Store this into other caches
if ok, actualFiles, err := cache.Fetch(target, key, files); ok {
// Store this into other caches. We can ignore errors here because we know
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
mplex.storeUntil(target, key, 0, actualFiles, i)
return ok, actualFiles, nil
return ok, actualFiles, err
}
}
return false, files, nil
Expand Down
1 change: 1 addition & 0 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()

r, w := io.Pipe()
go cache.write(w, hash, files)
return cache.config.ApiClient.PutArtifact(hash, cache.config.TeamId, cache.config.TeamSlug, duration, r)
Expand Down
114 changes: 102 additions & 12 deletions cli/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package client

import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/hashicorp/go-hclog"
Expand All @@ -20,31 +24,99 @@ type ApiClient struct {
baseUrl string
Token string
turboVersion string
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
maxRemoteFailCount uint64
// Must be used via atomic package
currentFailCount uint64
// An http client
HttpClient *retryablehttp.Client
}

// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred
var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred")

func (api *ApiClient) SetToken(token string) {
api.Token = token
}

// New creates a new ApiClient
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient {
return &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient {
client := &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
maxRemoteFailCount: maxRemoteFailCount,
HttpClient: &retryablehttp.Client{
HTTPClient: &http.Client{
Timeout: time.Duration(60 * time.Second),
Timeout: time.Duration(20 * time.Second),
},
RetryWaitMin: 10 * time.Second,
RetryWaitMax: 20 * time.Second,
RetryMax: 5,
CheckRetry: retryablehttp.DefaultRetryPolicy,
RetryWaitMin: 2 * time.Second,
RetryWaitMax: 10 * time.Second,
RetryMax: 2,
Backoff: retryablehttp.DefaultBackoff,
Logger: logger,
},
}
client.HttpClient.CheckRetry = client.checkRetry
return client
}

func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
if errors.As(err, &x509.UnknownAuthorityError{}) {
// Don't retry if the error was due to TLS cert verification failure.
atomic.AddUint64(&c.currentFailCount, 1)
return false, err
}
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// 429 Too Many Requests is recoverable. Sometimes the server puts
// a Retry-After response header to indicate when the server is
// available to start processing request from client.
if resp.StatusCode == http.StatusTooManyRequests {
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
atomic.AddUint64(&c.currentFailCount, 1)
return true, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

return false, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
atomic.AddUint64(&c.currentFailCount, 1)
return false, ctx.Err()
}

// we're squashing the error from the request and substituting any error that might come
// from our retry policy.
shouldRetry, err := c.retryCachePolicy(resp, err)
if shouldRetry {
// Our policy says it's ok to retry, but we need to check the failure count
if retryErr := c.okToRequest(); retryErr != nil {
return false, retryErr
}
}
return shouldRetry, err
}

// okToRequest returns nil if it's ok to make a request, and returns the error to
// return to the caller if a request is not allowed
func (c *ApiClient) okToRequest() error {
if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount {
return nil
}
return ErrTooManyFailures
}

func (c *ApiClient) makeUrl(endpoint string) string {
Expand All @@ -56,21 +128,30 @@ func (c *ApiClient) UserAgent() string {
}

func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error {
if err := c.okToRequest(); err != nil {
return err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
}
if slug != "" {
params.Add("slug", slug)
}
req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), rawBody)
// only add a ? if it's actually needed (makes logging cleaner)
encoded := params.Encode()
if encoded != "" {
encoded = "?" + encoded
}
req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+encoded), rawBody)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("x-artifact-duration", fmt.Sprintf("%v", duration))
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("User-Agent", c.UserAgent())
if err != nil {
return fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
}

if resp, err := c.HttpClient.Do(req); err != nil {
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
} else {
Expand All @@ -80,19 +161,28 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
}

func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) {
if err := c.okToRequest(); err != nil {
return nil, err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
}
if slug != "" {
params.Add("slug", slug)
}
req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), nil)
// only add a ? if it's actually needed (makes logging cleaner)
encoded := params.Encode()
if encoded != "" {
encoded = "?" + encoded
}
req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+encoded), nil)
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("User-Agent", c.UserAgent())
if err != nil {
return nil, fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
return nil, fmt.Errorf("invalid cache URL: %w", err)
}

return c.HttpClient.Do(req)
}

Expand Down
6 changes: 3 additions & 3 deletions cli/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"

"github.com/vercel/turborepo/cli/internal/client"

hclog "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -53,8 +54,6 @@ type CacheConfig struct {
Workers int
// Cache directory
Dir string
// HTTP URI of the cache
Url string
}

// ParseAndValidate parses the cmd line flags / env vars, and verifies that all required
Expand Down Expand Up @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
Output: output,
})

apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion)
maxRemoteFailCount := 3
apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount))

c = &Config{
Logger: logger,
Expand Down