这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package cache

import (
"fmt"
"sync"
"golang.org/x/sync/errgroup"

"github.com/vercel/turborepo/cli/internal/config"
"github.com/vercel/turborepo/cli/internal/ui"

"golang.org/x/sync/errgroup"
)

// Cache is abstracted way to cache/fetch previously run tasks
Expand Down Expand Up @@ -65,7 +64,7 @@ func (mplex cacheMultiplexer) Put(target string, key string, duration int, files
// but it's hard to fix that without breaking the cache abstraction.
func (mplex cacheMultiplexer) storeUntil(target string, key string, duration int, outputGlobs []string, stopAt int) error {
// Attempt to store on all caches simultaneously.
g := new(errgroup.Group)
g := &errgroup.Group{}
for i, cache := range mplex.caches {
if i == stopAt {
break
Expand Down
6 changes: 0 additions & 6 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (cache *httpCache) Put(target, hash string, duration int, files []string) e
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount {
return fmt.Errorf("skipping uploading artifacts to HTTP cache: too many failures")
}

r, w := io.Pipe()
go cache.write(w, hash, files)
Expand Down Expand Up @@ -106,9 +103,6 @@ func (cache *httpCache) storeFile(tw *tar.Writer, name string) error {
func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (bool, []string, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
if cache.config.ApiClient.CurrentFailCount > cache.config.Cache.MaxRemoteFailCount {
return false, nil, fmt.Errorf("skipping downloading artifacts to HTTP cache: too many past failures")
}
m, files, err := cache.retrieve(key)
if err != nil {
return false, files, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
Expand Down
100 changes: 59 additions & 41 deletions cli/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package client

import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,24 +21,30 @@ import (

type ApiClient struct {
// The api's base URL
baseUrl string
Token string
turboVersion string
CurrentFailCount uint64
baseUrl string
Token string
turboVersion string
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
maxRemoteFailCount uint64
// Must be used via atomic package
currentFailCount uint64
// An http client
HttpClient *retryablehttp.Client
}

// ErrTooManyFailures is returned from remote cache API methods after `maxRemoteFailCount` errors have occurred
var ErrTooManyFailures = errors.New("skipping HTTP Request, too many failures have occurred")

func (api *ApiClient) SetToken(token string) {
api.Token = token
}

// New creates a new ApiClient
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiClient {
func NewClient(baseUrl string, logger hclog.Logger, turboVersion string, maxRemoteFailCount uint64) *ApiClient {
client := &ApiClient{
baseUrl: baseUrl,
turboVersion: turboVersion,
CurrentFailCount: 0,
baseUrl: baseUrl,
turboVersion: turboVersion,
maxRemoteFailCount: maxRemoteFailCount,
HttpClient: &retryablehttp.Client{
HTTPClient: &http.Client{
Timeout: time.Duration(20 * time.Second),
Expand All @@ -47,27 +56,26 @@ func NewClient(baseUrl string, logger hclog.Logger, turboVersion string) *ApiCli
Logger: logger,
},
}
client.HttpClient.CheckRetry = client.checkRetry
return client
}

func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
func (c *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
if v, ok := err.(*url.Error); ok {
if errors.As(err, &x509.UnknownAuthorityError{}) {
// Don't retry if the error was due to TLS cert verification failure.
if _, ok := v.Err.(x509.UnknownAuthorityError); ok {
atomic.AddUint64(&client.CurrentFailCount, 1)
return false, v
}
atomic.AddUint64(&c.currentFailCount, 1)
return false, err
}
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

// 429 Too Many Requests is recoverable. Sometimes the server puts
// a Retry-After response header to indicate when the server is
// available to start processing request from client.
if resp.StatusCode == http.StatusTooManyRequests {
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, nil
}

Expand All @@ -76,13 +84,41 @@ func (client *ApiClient) retryCachePolicy(resp *http.Response, err error) (bool,
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
atomic.AddUint64(&client.CurrentFailCount, 1)
atomic.AddUint64(&c.currentFailCount, 1)
return true, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

return false, fmt.Errorf("unexpected HTTP status %s", resp.Status)
}

func (c *ApiClient) checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
atomic.AddUint64(&c.currentFailCount, 1)
return false, ctx.Err()
}

// we're squashing the error from the request and substituting any error that might come
// from our retry policy.
shouldRetry, err := c.retryCachePolicy(resp, err)
if shouldRetry {
// Our policy says it's ok to retry, but we need to check the failure count
if retryErr := c.okToRequest(); retryErr != nil {
return false, retryErr
}
}
return shouldRetry, err
}

// okToRequest returns nil if it's ok to make a request, and returns the error to
// return to the caller if a request is not allowed
func (c *ApiClient) okToRequest() error {
if atomic.LoadUint64(&c.currentFailCount) < c.maxRemoteFailCount {
return nil
}
return ErrTooManyFailures
}

func (c *ApiClient) makeUrl(endpoint string) string {
return fmt.Sprintf("%v%v", c.baseUrl, endpoint)
}
Expand All @@ -92,6 +128,9 @@ func (c *ApiClient) UserAgent() string {
}

func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duration int, rawBody interface{}) error {
if err := c.okToRequest(); err != nil {
return err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
Expand All @@ -113,18 +152,6 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
return fmt.Errorf("[WARNING] Invalid cache URL: %w", err)
}

c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1)
return false, ctx.Err()
}

// don't propagate other errors
shouldRetry, err := c.retryCachePolicy(resp, err)
return shouldRetry, err
}

if resp, err := c.HttpClient.Do(req); err != nil {
return fmt.Errorf("failed to store files in HTTP cache: %w", err)
} else {
Expand All @@ -134,6 +161,9 @@ func (c *ApiClient) PutArtifact(hash string, teamId string, slug string, duratio
}

func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBody interface{}) (*http.Response, error) {
if err := c.okToRequest(); err != nil {
return nil, err
}
params := url.Values{}
if teamId != "" && strings.HasPrefix(teamId, "team_") {
params.Add("teamId", teamId)
Expand All @@ -153,18 +183,6 @@ func (c *ApiClient) FetchArtifact(hash string, teamId string, slug string, rawBo
return nil, fmt.Errorf("invalid cache URL: %w", err)
}

c.HttpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
c.CurrentFailCount = atomic.AddUint64(&c.CurrentFailCount, 1)
return false, ctx.Err()
}

// don't propagate other errors
shouldRetry, err := c.retryCachePolicy(resp, err)
return shouldRetry, err
}

return c.HttpClient.Do(req)
}

Expand Down
11 changes: 5 additions & 6 deletions cli/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"strings"

"github.com/vercel/turborepo/cli/internal/client"

hclog "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -49,8 +50,6 @@ type Config struct {

// CacheConfig
type CacheConfig struct {
// Number of failed requests before we stop trying to upload/download artifacts to the remote cache
MaxRemoteFailCount uint64
// Number of async workers
Workers int
// Cache directory
Expand Down Expand Up @@ -162,7 +161,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
Output: output,
})

apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion)
maxRemoteFailCount := 3
apiClient := client.NewClient(partialConfig.ApiUrl, logger, turboVersion, uint64(maxRemoteFailCount))

c = &Config{
Logger: logger,
Expand All @@ -174,9 +174,8 @@ func ParseAndValidate(args []string, ui cli.Ui, turboVersion string) (c *Config,
ApiClient: apiClient,
TurboVersion: turboVersion,
Cache: &CacheConfig{
MaxRemoteFailCount: 3,
Workers: runtime.NumCPU() + 2,
Dir: filepath.Join("node_modules", ".cache", "turbo"),
Workers: runtime.NumCPU() + 2,
Dir: filepath.Join("node_modules", ".cache", "turbo"),
},
}

Expand Down