diff --git a/.vscode/launch.json b/.vscode/launch.json index 9782fd2e0f631..f0b818e1ec1b6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -60,6 +60,15 @@ "program": "${workspaceRoot}/cli/cmd/turbo", "cwd": "${workspaceRoot}", "args": ["run", "build", "--force"] + }, + { + "name": "Kitchen Sink Dry Run", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceRoot}/cli/cmd/turbo", + "cwd": "${workspaceRoot}/examples/kitchen-sink", + "args": ["run", "build", "--dry"] } ] } diff --git a/cli/internal/context/context.go b/cli/internal/context/context.go index 9c9a1bb67a17e..8281ab16caff6 100644 --- a/cli/internal/context/context.go +++ b/cli/internal/context/context.go @@ -20,11 +20,10 @@ import ( "github.com/Masterminds/semver" mapset "github.com/deckarep/golang-set" "github.com/pyr-sh/dag" - gitignore "github.com/sabhiram/go-gitignore" "golang.org/x/sync/errgroup" ) -const GLOBAL_CACHE_KEY = "the hero we needed" +const GLOBAL_CACHE_KEY = "Ba weep granna weep ninny bong" // Context of the CLI type Context struct { @@ -35,7 +34,6 @@ type Context struct { RootNode string GlobalHash string Lockfile *fs.YarnLockfile - SCC [][]dag.Vertex Backend *api.LanguageBackend // Used to arbitrate access to the graph. We parallelise most build operations // and Go maps aren't natively threadsafe so this is needed. @@ -182,76 +180,22 @@ func WithGraph(rootpath string, config *config.Config) Option { if err := parseJSONWaitGroup.Wait(); err != nil { return err } - packageDepsHashGroup := new(errgroup.Group) populateGraphWaitGroup := new(errgroup.Group) for _, pkg := range c.PackageInfos { pkg := pkg populateGraphWaitGroup.Go(func() error { return c.populateTopologicGraphForPackageJson(pkg, rootpath) }) - packageDepsHashGroup.Go(func() error { - return c.loadPackageDepsHash(pkg) - }) } if err := populateGraphWaitGroup.Wait(); err != nil { return err } - if err := packageDepsHashGroup.Wait(); err != nil { - return err - } - // Only now can we get the SCC (i.e. topological order) - c.SCC = dag.StronglyConnected(&c.TopologicalGraph.Graph) return nil } } -func (c *Context) loadPackageDepsHash(pkg *fs.PackageJSON) error { - pkg.Mu.Lock() - defer pkg.Mu.Unlock() - hashObject, pkgDepsErr := fs.GetPackageDeps(&fs.PackageDepsOptions{ - PackagePath: pkg.Dir, - }) - if pkgDepsErr != nil { - hashObject = make(map[string]string) - // Instead of implementing all gitignore properly, we hack it. We only respect .gitignore in the root and in - // the directory of a package. - ignore, err := safeCompileIgnoreFile(".gitignore") - if err != nil { - return err - } - - ignorePkg, err := safeCompileIgnoreFile(filepath.Join(pkg.Dir, ".gitignore")) - if err != nil { - return err - } - - fs.Walk(pkg.Dir, func(name string, isDir bool) error { - rootMatch := ignore.MatchesPath(name) - otherMatch := ignorePkg.MatchesPath(name) - if !rootMatch && !otherMatch { - if !isDir { - hash, err := fs.GitLikeHashFile(name) - if err != nil { - return fmt.Errorf("could not hash file %v. \n%w", name, err) - } - hashObject[strings.TrimPrefix(name, pkg.Dir+"/")] = hash - } - } - return nil - }) - - // ignorefile rules matched files - } - hashOfFiles, otherErr := fs.HashObject(hashObject) - if otherErr != nil { - return otherErr - } - pkg.FilesHash = hashOfFiles - return nil -} - func (c *Context) resolveWorkspaceRootDeps(rootPackageJSON *fs.PackageJSON) error { seen := mapset.NewSet() var lockfileWg sync.WaitGroup @@ -432,14 +376,6 @@ func (c *Context) resolveDepGraph(wg *sync.WaitGroup, unresolvedDirectDeps map[s } } -func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) { - if fs.FileExists(filepath) { - return gitignore.CompileIgnoreFile(filepath) - } - // no op - return gitignore.CompileIgnoreLines([]string{}...), nil -} - func getWorkspaceIgnores() []string { return []string{ "**/node_modules/", diff --git a/cli/internal/fs/package_deps_hash.go b/cli/internal/fs/package_deps_hash.go index a8c75fa109019..b8db32ced063a 100644 --- a/cli/internal/fs/package_deps_hash.go +++ b/cli/internal/fs/package_deps_hash.go @@ -29,17 +29,32 @@ type PackageDepsOptions struct { ExcludedPaths []string // GitPath is an optional alternative path to the git installation GitPath string + + InputPatterns []string } // GetPackageDeps Builds an object containing git hashes for the files under the specified `packagePath` folder. func GetPackageDeps(p *PackageDepsOptions) (map[string]string, error) { - gitLsOutput, err := gitLsTree(p.PackagePath, p.GitPath) - if err != nil { - return nil, fmt.Errorf("could not get git hashes for files in package %s: %w", p.PackagePath, err) - } // Add all the checked in hashes. // TODO(gsoltis): are these platform-dependent paths? - result := parseGitLsTree(gitLsOutput) + var result map[string]string + if len(p.InputPatterns) == 0 { + gitLsOutput, err := gitLsTree(p.PackagePath, p.GitPath) + if err != nil { + return nil, fmt.Errorf("could not get git hashes for files in package %s: %w", p.PackagePath, err) + } + result = parseGitLsTree(gitLsOutput) + } else { + gitLsOutput, err := gitLsFiles(p.PackagePath, p.GitPath, p.InputPatterns) + if err != nil { + return nil, fmt.Errorf("could not get git hashes for file patterns %v in package %s: %w", p.InputPatterns, p.PackagePath, err) + } + parsedLines, err := parseGitLsFiles(gitLsOutput) + if err != nil { + return nil, err + } + result = parsedLines + } if len(p.ExcludedPaths) > 0 { for _, p := range p.ExcludedPaths { @@ -149,6 +164,19 @@ func gitLsTree(path string, gitPath string) (string, error) { return strings.TrimSpace(string(out)), nil } +func gitLsFiles(path string, gitPath string, patterns []string) (string, error) { + cmd := exec.Command("git", "ls-files", "-s", "--") + for _, pattern := range patterns { + cmd.Args = append(cmd.Args, pattern) + } + cmd.Dir = path + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("failed to read `git ls-tree`: %w", err) + } + return strings.TrimSpace(string(out)), nil +} + func parseGitLsTree(output string) map[string]string { changes := make(map[string]string) if len(output) > 0 { @@ -178,6 +206,35 @@ func parseGitLsTree(output string) map[string]string { return changes } +func parseGitLsFiles(output string) (map[string]string, error) { + changes := make(map[string]string) + if len(output) > 0 { + // A line is expected to look like: + // 100644 3451bccdc831cb43d7a70ed8e628dcf9c7f888c8 0 src/typings/tsd.d.ts + // 160000 c5880bf5b0c6c1f2e2c43c95beeb8f0a808e8bac 0 rushstack + gitRex := regexp.MustCompile(`[0-9]{6}\s([a-f0-9]{40})\s[0-3]\s*(.+)`) + outputLines := strings.Split(output, "\n") + + for _, line := range outputLines { + if len(line) > 0 { + match := gitRex.FindStringSubmatch(line) + // we found matches, and the slice has three parts: + // 0 - the whole string + // 1 - the hash + // 2 - the filename + if match != nil && len(match) == 3 { + hash := match[1] + filename := parseGitFilename(match[2]) + changes[filename] = hash + } else { + return nil, fmt.Errorf("failed to parse git ls-files output line %v", line) + } + } + } + } + return changes, nil +} + // Couldn't figure out how to deal with special characters. Skipping for now. // @todo see https://github.com/microsoft/rushstack/blob/925ad8c9e22997c1edf5fe38c53fa618e8180f70/libraries/package-deps-hash/src/getPackageDeps.ts#L19 func parseGitFilename(filename string) string { diff --git a/cli/internal/fs/package_json.go b/cli/internal/fs/package_json.go index 899940c7e25f4..c563588c05dc4 100644 --- a/cli/internal/fs/package_json.go +++ b/cli/internal/fs/package_json.go @@ -6,6 +6,7 @@ import ( "os" "sync" + "github.com/vercel/turborepo/cli/internal/util" "github.com/yosuke-furukawa/json5/encoding/json5" ) @@ -17,7 +18,7 @@ type TurboConfigJSON struct { GlobalDependencies []string `json:"globalDependencies,omitempty"` // Pipeline is a map of Turbo pipeline entries which define the task graph // and cache behavior on a per task or per package-task basis. - Pipeline map[string]Pipeline + Pipeline PipelineConfig // Configuration options when interfacing with the remote cache RemoteCacheOptions RemoteCacheOptions `json:"remoteCache,omitempty"` } @@ -47,12 +48,25 @@ type PPipeline struct { Outputs *[]string `json:"outputs"` Cache *bool `json:"cache,omitempty"` DependsOn []string `json:"dependsOn,omitempty"` + Inputs []string `json:"inputs,omitempty"` +} + +type PipelineConfig map[string]Pipeline + +func (pc PipelineConfig) GetPipeline(taskID string) (Pipeline, bool) { + if entry, ok := pc[taskID]; ok { + return entry, true + } + _, task := util.GetPackageTaskFromId(taskID) + entry, ok := pc[task] + return entry, ok } type Pipeline struct { Outputs []string `json:"-"` Cache *bool `json:"cache,omitempty"` DependsOn []string `json:"dependsOn,omitempty"` + Inputs []string `json:"inputs,omitempty"` PPipeline } @@ -69,6 +83,7 @@ func (c *Pipeline) UnmarshalJSON(data []byte) error { } c.Cache = c.PPipeline.Cache c.DependsOn = c.PPipeline.DependsOn + c.Inputs = c.PPipeline.Inputs return nil } @@ -86,15 +101,13 @@ type PackageJSON struct { Workspaces Workspaces `json:"workspaces,omitempty"` Private bool `json:"private,omitempty"` PackageJSONPath string - Hash string - Dir string + Dir string // relative path from repo root to the package InternalDeps []string UnresolvedExternalDeps map[string]string ExternalDeps []string SubLockfile YarnLockfile LegacyTurboConfig *TurboConfigJSON `json:"turbo"` Mu sync.Mutex - FilesHash string ExternalDepsHash string } diff --git a/cli/internal/fs/package_json_test.go b/cli/internal/fs/package_json_test.go index 86388cb6c3440..b79ae507a312e 100644 --- a/cli/internal/fs/package_json_test.go +++ b/cli/internal/fs/package_json_test.go @@ -20,9 +20,24 @@ func Test_ParseTurboConfigJson(t *testing.T) { } BoolFalse := false - build := Pipeline{[]string{"dist/**", ".next/**"}, nil, []string{"^build"}, PPipeline{&[]string{"dist/**", ".next/**"}, nil, []string{"^build"}}} - lint := Pipeline{[]string{}, nil, nil, PPipeline{&[]string{}, nil, nil}} - dev := Pipeline{nil, &BoolFalse, nil, PPipeline{nil, &BoolFalse, nil}} + build := Pipeline{ + Outputs: []string{"dist/**", ".next/**"}, + DependsOn: []string{"^build"}, + PPipeline: PPipeline{ + Outputs: &[]string{"dist/**", ".next/**"}, + DependsOn: []string{"^build"}, + }, + } + lint := Pipeline{ + Outputs: []string{}, + PPipeline: PPipeline{Outputs: &[]string{}}, + } + dev := Pipeline{ + Cache: &BoolFalse, + PPipeline: PPipeline{ + Cache: &BoolFalse, + }, + } pipelineExpected := map[string]Pipeline{"build": build, "lint": lint, "dev": dev} remoteCacheOptionsExpected := RemoteCacheOptions{"team_id", true} diff --git a/cli/internal/run/hash.go b/cli/internal/run/hash.go new file mode 100644 index 0000000000000..4d78e5af512de --- /dev/null +++ b/cli/internal/run/hash.go @@ -0,0 +1,276 @@ +package run + +// TODO(gsoltis): This should eventually either be its own package or part of core + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "sync" + + "github.com/bmatcuk/doublestar/v4" + "github.com/pyr-sh/dag" + gitignore "github.com/sabhiram/go-gitignore" + "github.com/vercel/turborepo/cli/internal/fs" + "github.com/vercel/turborepo/cli/internal/util" + "golang.org/x/sync/errgroup" +) + +// Tracker caches package-inputs hashes, as well as package-task hashes. +// package-inputs hashes must be calculated before package-task hashes, +// and package-task hashes must be calculated in topographical order. +// package-task hashing is threadsafe, provided topographical order is +// respected. +type Tracker struct { + rootNode string + globalHash string + pipeline fs.PipelineConfig + packageInfos map[interface{}]*fs.PackageJSON + mu sync.RWMutex + packageInputsHashes packageFileHashes + packageTaskHashes map[string]string // taskID -> hash +} + +// NewTracker creates a tracker for package-inputs combinations and package-task combinations. +func NewTracker(rootNode string, globalHash string, pipeline fs.PipelineConfig, packageInfos map[interface{}]*fs.PackageJSON) *Tracker { + return &Tracker{ + rootNode: rootNode, + globalHash: globalHash, + pipeline: pipeline, + packageInfos: packageInfos, + packageTaskHashes: make(map[string]string), + } +} + +// packageFileSpec defines a combination of a package and optional set of input globs +type packageFileSpec struct { + pkg string + inputs []string +} + +// packageFileHashKey is a hashable representation of a packageFileSpec. +type packageFileHashKey string + +func (pfs *packageFileSpec) ToKey() packageFileHashKey { + sort.Strings(pfs.inputs) + return packageFileHashKey(fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!"))) +} + +func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) { + if fs.FileExists(filepath) { + return gitignore.CompileIgnoreFile(filepath) + } + // no op + return gitignore.CompileIgnoreLines([]string{}...), nil +} + +func (pfs *packageFileSpec) hash(pkg *fs.PackageJSON, rootPath string) (string, error) { + hashObject, pkgDepsErr := fs.GetPackageDeps(&fs.PackageDepsOptions{ + PackagePath: pkg.Dir, + InputPatterns: pfs.inputs, + }) + if pkgDepsErr != nil { + manualHashObject, err := manuallyHashPackage(pkg, pfs.inputs, rootPath) + if err != nil { + return "", err + } + hashObject = manualHashObject + } + hashOfFiles, otherErr := fs.HashObject(hashObject) + if otherErr != nil { + return "", otherErr + } + return hashOfFiles, nil +} + +func manuallyHashPackage(pkg *fs.PackageJSON, inputs []string, rootPath string) (map[string]string, error) { + hashObject := make(map[string]string) + // Instead of implementing all gitignore properly, we hack it. We only respect .gitignore in the root and in + // the directory of a package. + ignore, err := safeCompileIgnoreFile(filepath.Join(rootPath, ".gitignore")) + if err != nil { + return nil, err + } + + ignorePkg, err := safeCompileIgnoreFile(filepath.Join(rootPath, pkg.Dir, ".gitignore")) + if err != nil { + return nil, err + } + + includePattern := "" + if len(inputs) > 0 { + includePattern = "{" + strings.Join(inputs, ",") + "}" + } + + pathPrefix := filepath.Join(rootPath, pkg.Dir) + toTrim := filepath.FromSlash(pathPrefix + "/") + fs.Walk(pathPrefix, func(name string, isDir bool) error { + rootMatch := ignore.MatchesPath(name) + otherMatch := ignorePkg.MatchesPath(name) + if !rootMatch && !otherMatch { + if !isDir { + if includePattern != "" { + val, err := doublestar.PathMatch(includePattern, name) + if err != nil { + return err + } + if !val { + return nil + } + } + hash, err := fs.GitLikeHashFile(name) + if err != nil { + return fmt.Errorf("could not hash file %v. \n%w", name, err) + } + hashObject[strings.TrimPrefix(name, toTrim)] = hash + } + } + return nil + }) + return hashObject, nil +} + +// packageFileHashes is a map from a package and optional input globs to the hash of +// the matched files in the package. +type packageFileHashes map[packageFileHashKey]string + +// CalculateFileHashes hashes each unique package-inputs combination that is present +// in the task graph. Must be called before calculating task hashes. +func (th *Tracker) CalculateFileHashes(allTasks []dag.Vertex, workerCount int, rootPath string) error { + hashTasks := make(util.Set) + for _, v := range allTasks { + taskID, ok := v.(string) + if !ok { + return fmt.Errorf("unknown task %v", taskID) + } + if taskID == th.rootNode { + continue + } + pkgName, _ := util.GetPackageTaskFromId(taskID) + if pkgName == th.rootNode { + continue + } + pipelineEntry, ok := th.pipeline.GetPipeline(taskID) + if !ok { + return fmt.Errorf("missing pipeline entry %v", taskID) + } + hashTasks.Add(&packageFileSpec{ + pkg: pkgName, + inputs: pipelineEntry.Inputs, + }) + } + + hashes := make(map[packageFileHashKey]string) + hashQueue := make(chan *packageFileSpec, workerCount) + hashErrs := &errgroup.Group{} + for i := 0; i < workerCount; i++ { + hashErrs.Go(func() error { + for ht := range hashQueue { + pkg, ok := th.packageInfos[ht.pkg] + if !ok { + return fmt.Errorf("cannot find package %v", ht.pkg) + } + hash, err := ht.hash(pkg, rootPath) + if err != nil { + return err + } + th.mu.Lock() + hashes[ht.ToKey()] = hash + th.mu.Unlock() + } + return nil + }) + } + for ht := range hashTasks { + hashQueue <- ht.(*packageFileSpec) + } + close(hashQueue) + err := hashErrs.Wait() + if err != nil { + return err + } + th.packageInputsHashes = hashes + return nil +} + +type taskHashInputs struct { + hashOfFiles string + externalDepsHash string + task string + outputs []string + passThruArgs []string + hashableEnvPairs []string + globalHash string + taskDependencyHashes []string +} + +func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, error) { + dependencyHashSet := make(util.Set) + + rootPrefix := th.rootNode + util.TASK_DELIMITER + th.mu.RLock() + defer th.mu.RUnlock() + for _, dependency := range dependencySet { + if dependency == th.rootNode { + continue + } + dependencyTask, ok := dependency.(string) + if !ok { + return nil, fmt.Errorf("unknown task: %v", dependency) + } + if strings.HasPrefix(dependencyTask, rootPrefix) { + continue + } + dependencyHash, ok := th.packageTaskHashes[dependencyTask] + if !ok { + return nil, fmt.Errorf("missing hash for dependent task: %v", dependencyTask) + } + dependencyHashSet.Add(dependencyHash) + } + dependenciesHashList := dependencyHashSet.UnsafeListOfStrings() + sort.Strings(dependenciesHashList) + return dependenciesHashList, nil +} + +// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided +// that it has previously been called on its task-graph dependencies. File hashes must be calculated +// first. +func (th *Tracker) CalculateTaskHash(pt *packageTask, dependencySet dag.Set, args []string) (string, error) { + pkgFileHashKey := pt.ToPackageFileHashKey() + hashOfFiles, ok := th.packageInputsHashes[pkgFileHashKey] + if !ok { + return "", fmt.Errorf("cannot find package-file hash for %v", pkgFileHashKey) + } + outputs := pt.HashableOutputs() + hashableEnvPairs := []string{} + for _, v := range pt.pipeline.DependsOn { + if strings.Contains(v, ENV_PIPELINE_DELIMITER) { + trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) + hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) + } + } + sort.Strings(hashableEnvPairs) + taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet) + if err != nil { + return "", err + } + hash, err := fs.HashObject(&taskHashInputs{ + hashOfFiles: hashOfFiles, + externalDepsHash: pt.pkg.ExternalDepsHash, + task: pt.task, + outputs: outputs, + passThruArgs: args, + hashableEnvPairs: hashableEnvPairs, + globalHash: th.globalHash, + taskDependencyHashes: taskDependencyHashes, + }) + if err != nil { + return "", fmt.Errorf("failed to hash task %v: %v", pt.taskID, hash) + } + th.mu.Lock() + th.packageTaskHashes[pt.taskID] = hash + th.mu.Unlock() + return hash, nil +} diff --git a/cli/internal/run/hash_test.go b/cli/internal/run/hash_test.go new file mode 100644 index 0000000000000..e1a2a8daf9fe5 --- /dev/null +++ b/cli/internal/run/hash_test.go @@ -0,0 +1,136 @@ +package run + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/vercel/turborepo/cli/internal/fs" +) + +func Test_manuallyHashPackage(t *testing.T) { + rootIgnore := strings.Join([]string{ + "ignoreme", + "ignorethisdir/", + }, "\n") + pkgIgnore := strings.Join([]string{ + "pkgignoreme", + "pkgignorethisdir/", + }, "\n") + root, err := os.MkdirTemp("", "turbo-manual-file-hashing-") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + pkgName := "libA" + type fileHash struct { + contents string + hash string + } + files := map[string]fileHash{ + filepath.FromSlash("top-level-file"): {"top-level-file-contents", ""}, + filepath.FromSlash("other-dir/other-dir-file"): {"other-dir-file-contents", ""}, + filepath.FromSlash("ignoreme"): {"anything", ""}, + filepath.FromSlash("libA/some-file"): {"some-file-contents", "7e59c6a6ea9098c6d3beb00e753e2c54ea502311"}, + filepath.FromSlash("libA/some-dir/other-file"): {"some-file-contents", "7e59c6a6ea9098c6d3beb00e753e2c54ea502311"}, + filepath.FromSlash("libA/some-dir/another-one"): {"some-file-contents", "7e59c6a6ea9098c6d3beb00e753e2c54ea502311"}, + filepath.FromSlash("libA/ignoreme"): {"anything", ""}, + filepath.FromSlash("libA/ignorethisdir/anything"): {"anything", ""}, + filepath.FromSlash("libA/pkgignoreme"): {"anything", ""}, + filepath.FromSlash("libA/pkgignorethisdir/file"): {"anything", ""}, + } + + rootIgnoreFile, err := os.Create(filepath.Join(root, ".gitignore")) + if err != nil { + t.Fatalf("failed to create .gitignore: %v", err) + } + _, err = rootIgnoreFile.WriteString(rootIgnore) + if err != nil { + t.Fatalf("failed to write contents to .gitignore: %v", err) + } + rootIgnoreFile.Close() + pkgIgnoreFilename := filepath.Join(root, pkgName, ".gitignore") + err = fs.EnsureDir(pkgIgnoreFilename) + if err != nil { + t.Fatalf("failed to ensure directories for %v: %v", pkgIgnoreFilename, err) + } + pkgIgnoreFile, err := os.Create(pkgIgnoreFilename) + if err != nil { + t.Fatalf("failed to create libA/.gitignore: %v", err) + } + _, err = pkgIgnoreFile.WriteString(pkgIgnore) + if err != nil { + t.Fatalf("failed to write contents to libA/.gitignore: %v", err) + } + pkgIgnoreFile.Close() + for path, spec := range files { + filename := filepath.Join(root, path) + err = fs.EnsureDir(filename) + if err != nil { + t.Fatalf("failed to ensure directories for %v: %v", filename, err) + } + f, err := os.Create(filename) + if err != nil { + t.Fatalf("failed to create file: %v: %v", filename, err) + } + _, err = f.WriteString(spec.contents) + if err != nil { + t.Fatalf("failed to write contents to %v: %v", filename, err) + } + f.Close() + } + // now that we've created the repo, expect our .gitignore file too + files[filepath.FromSlash("libA/.gitignore")] = fileHash{contents: "", hash: "3237694bc3312ded18386964a855074af7b066af"} + + pkg := &fs.PackageJSON{ + Dir: pkgName, + } + hashes, err := manuallyHashPackage(pkg, []string{}, root) + if err != nil { + t.Fatalf("failed to calculate manual hashes: %v", err) + } + prefix := filepath.FromSlash(pkgName + "/") + prefixLen := len(prefix) + count := 0 + for path, spec := range files { + if strings.HasPrefix(path, prefix) { + got, ok := hashes[path[prefixLen:]] + if !ok { + if spec.hash != "" { + t.Errorf("did not find hash for %v, but wanted one", path) + } + } else if got != spec.hash { + t.Errorf("hash of %v, got %v want %v", path, got, spec.hash) + } else { + count++ + } + } + } + if count != len(hashes) { + t.Errorf("found extra hashes in %v", hashes) + } + + count = 0 + justFileHashes, err := manuallyHashPackage(pkg, []string{filepath.FromSlash("**/*file")}, root) + if err != nil { + t.Fatalf("failed to calculate manual hashes: %v", err) + } + for path, spec := range files { + if strings.HasPrefix(path, prefix) { + shouldInclude := strings.HasSuffix(path, "file") + got, ok := justFileHashes[path[prefixLen:]] + if !ok && shouldInclude { + if spec.hash != "" { + t.Errorf("did not find hash for %v, but wanted one", path) + } + } else if shouldInclude && got != spec.hash { + t.Errorf("hash of %v, got %v want %v", path, got, spec.hash) + } else if shouldInclude { + count++ + } + } + } + if count != len(justFileHashes) { + t.Errorf("found extra hashes in %v", hashes) + } +} diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 72cf60a5cde28..595eaf8380b0a 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -55,8 +55,7 @@ type RunCommand struct { // It is not intended to include information specific to a particular run. type completeGraph struct { TopologicalGraph dag.AcyclicGraph - Pipeline map[string]fs.Pipeline - SCC [][]dag.Vertex + Pipeline fs.PipelineConfig PackageInfos map[interface{}]*fs.PackageJSON GlobalHash string RootNode string @@ -214,7 +213,6 @@ func (c *RunCommand) Run(args []string) int { g := &completeGraph{ TopologicalGraph: ctx.TopologicalGraph, Pipeline: c.Config.TurboConfigJSON.Pipeline, - SCC: ctx.SCC, PackageInfos: ctx.PackageInfos, GlobalHash: ctx.GlobalHash, RootNode: ctx.RootNode, @@ -229,63 +227,39 @@ func (c *RunCommand) Run(args []string) int { } func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.LanguageBackend, startAt time.Time) int { - var topoVisit []interface{} - for _, node := range g.SCC { - v := node[0] - if v == g.RootNode { - continue - } - topoVisit = append(topoVisit, v) - pack := g.PackageInfos[v] - - ancestralHashes := make([]string, 0, len(pack.InternalDeps)) - if len(pack.InternalDeps) > 0 { - for _, ancestor := range pack.InternalDeps { - if h, ok := g.PackageInfos[ancestor]; ok { - ancestralHashes = append(ancestralHashes, h.Hash) - } - } - sort.Strings(ancestralHashes) - } - var hashable = struct { - hashOfFiles string - ancestralHashes []string - externalDepsHash string - globalHash string - }{hashOfFiles: pack.FilesHash, ancestralHashes: ancestralHashes, externalDepsHash: pack.ExternalDepsHash, globalHash: g.GlobalHash} - - var err error - pack.Hash, err = fs.HashObject(hashable) - if err != nil { - c.logError(c.Config.Logger, "", fmt.Errorf("[ERROR] %v: error computing combined hash: %v", pack.Name, err)) - return 1 - } - c.Config.Logger.Debug(fmt.Sprintf("%v: package ancestralHash", pack.Name), "hash", ancestralHashes) - c.Config.Logger.Debug(fmt.Sprintf("%v: package hash", pack.Name), "hash", pack.Hash) - } - - c.Config.Logger.Debug("topological sort order", "value", topoVisit) - vertexSet := make(util.Set) for _, v := range g.TopologicalGraph.Vertices() { vertexSet.Add(v) } + engine, err := buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) + return 1 + } + hashTracker := NewTracker(g.RootNode, g.GlobalHash, g.Pipeline, g.PackageInfos) + err = hashTracker.CalculateFileHashes(engine.TaskGraph.Vertices(), rs.Opts.concurrency, rs.Opts.cwd) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error hashing package files: %s", err)) + return 1 + } + // If we are running in parallel, then we remove all the edges in the graph - // except for the root + // except for the root. Rebuild the task graph for backwards compatibility. + // We still use dependencies specified by the pipeline configuration. if rs.Opts.parallel { for _, edge := range g.TopologicalGraph.Edges() { if edge.Target() != g.RootNode { g.TopologicalGraph.RemoveEdge(edge) } } + engine, err = buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) + return 1 + } } - engine, err := buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) - return 1 - } exitCode := 0 if rs.Opts.dotGraph != "" { err := c.generateDotGraph(engine.TaskGraph, filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) @@ -294,7 +268,7 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La return 1 } } else if rs.Opts.dryRun { - tasksRun, err := c.executeDryRun(engine, g, rs, c.Config.Logger) + tasksRun, err := c.executeDryRun(engine, g, hashTracker, rs) if err != nil { c.logError(c.Config.Logger, "", err) return 1 @@ -351,7 +325,7 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La if rs.Opts.stream { c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) } - exitCode = c.executeTasks(g, rs, engine, backend, startAt) + exitCode = c.executeTasks(g, rs, engine, backend, hashTracker, startAt) } return exitCode @@ -369,7 +343,6 @@ func buildTaskGraph(topoGraph *dag.AcyclicGraph, pipeline map[string]fs.Pipeline } if util.IsPackageTask(from) { engine.AddDep(from, taskName) - continue } else if strings.Contains(from, TOPOLOGICAL_PIPELINE_DELIMITER) { topoDeps.Add(from[1:]) } else { @@ -666,7 +639,7 @@ func hasGraphViz() bool { return err == nil } -func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Scheduler, backend *api.LanguageBackend, startAt time.Time) int { +func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Scheduler, backend *api.LanguageBackend, hashes *Tracker, startAt time.Time) int { goctx := gocontext.Background() var analyticsSink analytics.Sink if c.Config.IsLoggedIn() { @@ -689,10 +662,14 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc logger: c.Config.Logger, backend: backend, processes: c.Processes, + taskHashes: hashes, } // run the thing - errs := engine.Execute(g.getPackageTaskVisitor(ec.exec), core.ExecOpts{ + errs := engine.Execute(g.getPackageTaskVisitor(func(pt *packageTask) error { + deps := engine.TaskGraph.DownEdges(pt.taskID) + return ec.exec(pt, deps) + }), core.ExecOpts{ Parallel: rs.Opts.parallel, Concurrency: rs.Opts.concurrency, }) @@ -731,20 +708,21 @@ type hashedTask struct { Dependents []string `json:"dependents"` } -func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs *runSpec, logger hclog.Logger) ([]hashedTask, error) { +func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, taskHashes *Tracker, rs *runSpec) ([]hashedTask, error) { taskIDs := []hashedTask{} errs := engine.Execute(g.getPackageTaskVisitor(func(pt *packageTask) error { - command, ok := pt.pkg.Scripts[pt.task] - if !ok { - logger.Debug("no task in package, skipping") - logger.Debug("done", "status", "skipped") - return nil - } passThroughArgs := rs.ArgsForTask(pt.task) - hash, err := pt.hash(passThroughArgs, logger) + deps := engine.TaskGraph.DownEdges(pt.taskID) + hash, err := taskHashes.CalculateTaskHash(pt, deps, passThroughArgs) if err != nil { return err } + command, ok := pt.pkg.Scripts[pt.task] + if !ok { + c.Config.Logger.Debug("no task in package, skipping") + c.Config.Logger.Debug("done", "status", "skipped") + return nil + } ancestors, err := engine.TaskGraph.Ancestors(pt.taskID) if err != nil { return err @@ -848,6 +826,7 @@ type execContext struct { logger hclog.Logger backend *api.LanguageBackend processes *process.Manager + taskHashes *Tracker } func (e *execContext) logError(log hclog.Logger, prefix string, err error) { @@ -860,19 +839,12 @@ func (e *execContext) logError(log hclog.Logger, prefix string, err error) { e.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) } -func (e *execContext) exec(pt *packageTask) error { +func (e *execContext) exec(pt *packageTask, deps dag.Set) error { cmdTime := time.Now() targetLogger := e.logger.Named(fmt.Sprintf("%v:%v", pt.pkg.Name, pt.task)) targetLogger.Debug("start") - // bail if the script doesn't exist - if _, ok := pt.pkg.Scripts[pt.task]; !ok { - targetLogger.Debug("no task in package, skipping") - targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) - return nil - } - // Setup tracer tracer := e.runState.Run(util.GetTaskId(pt.pkg.Name, pt.task)) @@ -891,12 +863,22 @@ func (e *execContext) exec(pt *packageTask) error { targetLogger.Debug("log file", "path", filepath.Join(e.rs.Opts.cwd, logFileName)) passThroughArgs := e.rs.ArgsForTask(pt.task) - hash, err := pt.hash(passThroughArgs, e.logger) + hash, err := e.taskHashes.CalculateTaskHash(pt, deps, passThroughArgs) e.logger.Debug("task hash", "value", hash) if err != nil { e.ui.Error(fmt.Sprintf("Hashing error: %v", err)) // @TODO probably should abort fatally??? } + // TODO(gsoltis): if/when we fix https://github.com/vercel/turborepo/issues/937 + // the following block should never get hit. In the meantime, keep it after hashing + // so that downstream tasks can count on the hash existing + // + // bail if the script doesn't exist + if _, ok := pt.pkg.Scripts[pt.task]; !ok { + targetLogger.Debug("no task in package, skipping") + targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) + return nil + } // Cache --------------------------------------------- var hit bool if !e.rs.Opts.forceExecution { @@ -1120,39 +1102,11 @@ func (pt *packageTask) HashableOutputs() []string { return outputs } -func (pt *packageTask) hash(args []string, logger hclog.Logger) (string, error) { - // Hash --------------------------------------------- - outputs := pt.HashableOutputs() - logger.Debug("task output globs", "outputs", outputs) - - // Hash the task-specific environment variables found in the dependsOnKey in the pipeline - var hashableEnvVars []string - var hashableEnvPairs []string - if len(pt.pipeline.DependsOn) > 0 { - for _, v := range pt.pipeline.DependsOn { - if strings.Contains(v, ENV_PIPELINE_DELIMITER) { - trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) - hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) - hashableEnvVars = append(hashableEnvVars, trimmed) - } - } - sort.Strings(hashableEnvVars) // always sort them - } - logger.Debug("hashable env vars", "vars", hashableEnvVars) - hashable := struct { - Hash string - Task string - Outputs []string - PassThruArgs []string - HashableEnvPairs []string - }{ - Hash: pt.pkg.Hash, - Task: pt.task, - Outputs: outputs, - PassThruArgs: args, - HashableEnvPairs: hashableEnvPairs, - } - return fs.HashObject(hashable) +func (pt *packageTask) ToPackageFileHashKey() packageFileHashKey { + return (&packageFileSpec{ + pkg: pt.packageName, + inputs: pt.pipeline.Inputs, + }).ToKey() } func (g *completeGraph) getPackageTaskVisitor(visitor func(pt *packageTask) error) func(taskID string) error { diff --git a/cli/internal/scope/scope_test.go b/cli/internal/scope/scope_test.go index 305f01632f367..1592cd29b8b70 100644 --- a/cli/internal/scope/scope_test.go +++ b/cli/internal/scope/scope_test.go @@ -55,7 +55,6 @@ func TestResolvePackages(t *testing.T) { graph.Connect(dag.BasicEdge("app1", "libA")) graph.Connect(dag.BasicEdge("app2", "libB")) graph.Connect(dag.BasicEdge("app2", "libC")) - scc := dag.StronglyConnected(&graph.Graph) packagesInfos := map[interface{}]*fs.PackageJSON{ "app0": { Dir: "app/app0", @@ -219,7 +218,6 @@ func TestResolvePackages(t *testing.T) { PackageInfos: packagesInfos, PackageNames: packageNames, TopologicalGraph: graph, - SCC: scc, }, tui, logger) if err != nil { t.Errorf("expected no error, got %v", err) diff --git a/cli/scripts/e2e/e2e.ts b/cli/scripts/e2e/e2e.ts index ec1c902e68f6d..f98320c2f3a32 100644 --- a/cli/scripts/e2e/e2e.ts +++ b/cli/scripts/e2e/e2e.ts @@ -11,6 +11,7 @@ const basicPipeline = { outputs: [], }, lint: { + inputs: ["build.js", "lint.js"], outputs: [], }, build: { @@ -313,12 +314,12 @@ function runSmokeTests( ); assert.ok( lintOutput.includes( - `a:lint: cache miss, executing ${getHashFromOutput( + `a:lint: cache hit, suppressing output ${getHashFromOutput( lintOutput, "a#lint" )}` ), - "Cache miss, a has changed" + "Cache hit, a has changed but not a file lint depends on" ); // Check that hashes are different and trigger a cascade @@ -341,12 +342,39 @@ function runSmokeTests( ); assert.ok( secondLintRun.includes( - `a:lint: cache miss, executing ${getHashFromOutput( + `a:lint: cache hit, suppressing output ${getHashFromOutput( secondLintRun, "a#lint" )}` ), - "Cache miss, dependency changes are irrelevant for lint task but we don't know that yet" + "Cache hit, dependency changes are irrelevant for lint task" + ); + + repo.commitFiles({ + [path.join("packages", "a", "lint.js")]: "console.log('lintingz a')", + }); + + const thirdLintRun = getCommandOutputAsArray( + repo.turbo( + "run", + ["lint", "--filter=a", "--stream", "--output-logs=hash-only"], + options + ) + ); + + assert.equal( + thirdLintRun[0], + `• Packages in scope: a`, + "Packages in scope for lint" + ); + assert.ok( + thirdLintRun.includes( + `a:lint: cache miss, executing ${getHashFromOutput( + thirdLintRun, + "a#lint" + )}` + ), + "Cache miss, we changed a file that lint uses as an input" ); const commandOnceBHasChangedOutput = getCommandOutputAsArray(