这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
34 changes: 20 additions & 14 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cache

import (
"fmt"
"sync"

"github.com/vercel/turborepo/cli/internal/config"
"github.com/vercel/turborepo/cli/internal/ui"
"golang.org/x/sync/errgroup"
)

// Cache is abstracted way to cache/fetch previously run tasks
Expand Down Expand Up @@ -53,39 +54,44 @@ type cacheMultiplexer struct {
}

func (mplex cacheMultiplexer) Put(target string, key string, duration int, files []string) error {
mplex.storeUntil(target, key, duration, files, len(mplex.caches))
return nil
return mplex.storeUntil(target, key, duration, files, len(mplex.caches))
}

// storeUntil stores artifacts into higher priority caches than the given one.
// Used after artifact retrieval to ensure we have them in eg. the directory cache after
// downloading from the RPC cache.
// This is a little inefficient since we could write the file to plz-out then copy it to the dir cache,
// but it's hard to fix that without breaking the cache abstraction.
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) {
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error {
// Attempt to store on all caches simultaneously.
var wg sync.WaitGroup
g := &errgroup.Group{}
for i, cache := range mplex.caches {
if i == stopAt {
break
}
wg.Add(1)
go func(cache Cache) {
cache.Put(target, key, duration, outputGlobs)
wg.Done()
}(cache)
c := cache
g.Go(func() error {
return c.Put(target, key, duration, outputGlobs)
})
}

if err := g.Wait(); err != nil {
return err
}
wg.Wait()

return nil
}

func (mplex cacheMultiplexer) Fetch(target string, key string, files []string) (bool, []string, error) {
// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range mplex.caches {
if ok, actualFiles, _ := cache.Fetch(target, key, files); ok {
// Store this into other caches
if ok, actualFiles, err := cache.Fetch(target, key, files); ok {
// Store this into other caches. We can ignore errors here because we know
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
mplex.storeUntil(target, key, 0, actualFiles, i)
return ok, actualFiles, nil
return ok, actualFiles, err
}
}
return false, files, nil
Expand Down
1 change: 1 addition & 0 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()

r, w := io.Pipe()
go cache.write(w, hash, files)
return cache.config.ApiClient.PutArtifact(hash, cache.config.TeamId, cache.config.TeamSlug, duration, r)
Expand Down
114 changes: 102 additions & 12 deletions cli/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package client

import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/hashicorp/go-hclog"
Expand All @@ -20,31 +24,99 @@ type ApiClient struct {
baseUrl string
Token string
turboVersion string
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
maxRemoteFailCount uint64
// Must be used via atomic package
currentFailCount uint64
// An http client
HttpClient *retryablehttp.Client
}

// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred
var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred")

func (api *ApiClient) SetToken(token string) {
api.Token = token
}

// New creates a new ApiClient
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient {
return &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient {
client := &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
maxRemoteFailCount: maxRemoteFailCount,
HttpClient: &retryablehttp.Client{
HTTPClient: &http.Client{
Timeout: time.Duration(60 * time.Second),
Timeout: time.Duration(20 * time.Second),
},
RetryWaitMin: 10 * time.Second,
RetryWaitMax: 20 * time.Second,
RetryMax: 5,
CheckRetry: retryablehttp.DefaultRetryPolicy,
RetryWaitMin: 2 * time.Second,
RetryWaitMax: 10 * time.Second,
RetryMax: 2,
Backoff: retryablehttp.DefaultBackoff,
Logger: logger,
},
}
client.HttpClient.CheckRetry = client.checkRetry
return client
}

func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
if errors.As(err, &x509.UnknownAuthorityError{}) {
// Don't retry if the error was due to TLS cert verification failure.
atomic.AddUint64(&c.currentFailCount, 1)
return false, err
}
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// 429 Too Many Requests is recoverable. Sometimes the server puts
// a Retry-After response header to indicate when the server is
// available to start processing request from client.
if resp.StatusCode == http.StatusTooManyRequests {
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
atomic.AddUint64(&c.currentFailCount, 1)
return true, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

return false, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
atomic.AddUint64(&c.currentFailCount, 1)
return false, ctx.Err()
}

// we're squashing the error from the request and substituting any error that might come
// from our retry policy.
shouldRetry, err := c.retryCachePolicy(resp, err)
if shouldRetry {
// Our policy says it's ok to retry, but we need to check the failure count
if retryErr := c.okToRequest(); retryErr != nil {
return false, retryErr
}
}
return shouldRetry, err
}

// okToRequest returns nil if it's ok to make a request, and returns the error to
// return to the caller if a request is not allowed
func (c *ApiClient) okToRequest() error {
if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount {
return nil
}
return ErrTooManyFailures
}

func (c *ApiClient) makeUrl(endpoint string) string {
Expand All @@ -56,21 +128,30 @@ func (c *ApiClient) UserAgent() string {
}

func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error {
if err := c.okToRequest(); err != nil {
return err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
}
if slug != "" {
params.Add("slug", slug)
}
req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), rawBody)
// only add a ? if it's actually needed (makes logging cleaner)
encoded := params.Encode()
if encoded != "" {
encoded = "?" + encoded
}
req, err := retryablehttp.NewRequest(http.MethodPut, c.makeUrl("/v8/artifacts/"+hash+encoded), rawBody)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("x-artifact-duration", fmt.Sprintf("%v", duration))
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("User-Agent", c.UserAgent())
if err != nil {
return fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
}

if resp, err := c.HttpClient.Do(req); err != nil {
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
} else {
Expand All @@ -80,19 +161,28 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
}

func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) {
if err := c.okToRequest(); err != nil {
return nil, err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
}
if slug != "" {
params.Add("slug", slug)
}
req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+"?"+params.Encode()), nil)
// only add a ? if it's actually needed (makes logging cleaner)
encoded := params.Encode()
if encoded != "" {
encoded = "?" + encoded
}
req, err := retryablehttp.NewRequest(http.MethodGet, c.makeUrl("/v8/artifacts/"+hash+encoded), nil)
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("User-Agent", c.UserAgent())
if err != nil {
return nil, fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
return nil, fmt.Errorf("invalid cache URL: %w", err)
}

return c.HttpClient.Do(req)
}

Expand Down
6 changes: 3 additions & 3 deletions cli/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"

"github.com/vercel/turborepo/cli/internal/client"

hclog "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -53,8 +54,6 @@ type CacheConfig struct {
Workers int
// Cache directory
Dir string
// HTTP URI of the cache
Url string
}

// ParseAndValidate parses the cmd line flags / env vars, and verifies that all required
Expand Down Expand Up @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
Output: output,
})

apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion)
maxRemoteFailCount := 3
apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount))

c = &Config{
Logger: logger,
Expand Down