diff --git a/.vscode/launch.json b/.vscode/launch.json index 95a9bd85f03de..55f314e399e2c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,6 +12,15 @@ "program": "${workspaceRoot}/cli/cmd/turbo", "cwd": "${workspaceRoot}/examples/basic", "args": ["run", "build"] + }, + { + "name": "Turbo Version", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceRoot}/cli/cmd/turbo", + "cwd": "${workspaceRoot}/examples/basic", + "args": ["--version"] } ] } diff --git a/cli/cmd/turbo/main.go b/cli/cmd/turbo/main.go index 80776d5f6ee66..0169cc02d522d 100644 --- a/cli/cmd/turbo/main.go +++ b/cli/cmd/turbo/main.go @@ -8,12 +8,14 @@ import ( "time" "turbo/internal/config" "turbo/internal/login" + "turbo/internal/process" prune "turbo/internal/prune" "turbo/internal/run" uiPkg "turbo/internal/ui" "turbo/internal/util" "github.com/fatih/color" + hclog "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" ) @@ -38,6 +40,7 @@ func main() { } } args = args[:argsEnd] + c := cli.NewCLI("turbo", turboVersion) util.InitPrintf() @@ -57,15 +60,25 @@ func main() { c.HelpWriter = os.Stdout c.ErrorWriter = os.Stderr // Parse and validate cmd line flags and env vars + // Note that cf can be nil cf, err := config.ParseAndValidate(c.Args, ui, turboVersion) if err != nil { ui.Error(fmt.Sprintf("%s %s", uiPkg.ERROR_PREFIX, color.RedString(err.Error()))) os.Exit(1) } + + var logger hclog.Logger + if cf != nil { + logger = cf.Logger + } else { + logger = hclog.Default() + } + processes := process.NewManager(logger.Named("processes")) + signalCh := watchSignals(func() { processes.Close() }) c.HiddenCommands = []string{"graph"} c.Commands = map[string]cli.CommandFactory{ "run": func() (cli.Command, error) { - return &run.RunCommand{Config: cf, Ui: ui}, + return &run.RunCommand{Config: cf, Ui: ui, Processes: processes}, nil }, "prune": func() (cli.Command, error) { @@ -87,7 +100,9 @@ func main() { // Capture the defer statements below so the "done" message comes last exitCode := 1 + doneCh := make(chan struct{}) func() { + defer func() { close(doneCh) }() // To view a CPU trace, use "go tool trace [file]". Note that the trace // viewer doesn't work under Windows Subsystem for Linux for some reason. if traceFile != "" { @@ -157,5 +172,12 @@ func main() { } } }() + // Wait for either our command to finish, in which case we need to clean up, + // or to receive a signal, in which case the signal handler above does the cleanup + select { + case <-doneCh: + processes.Close() + case <-signalCh: + } os.Exit(exitCode) } diff --git a/cli/cmd/turbo/signals.go b/cli/cmd/turbo/signals.go new file mode 100644 index 0000000000000..3329c8a7f6ede --- /dev/null +++ b/cli/cmd/turbo/signals.go @@ -0,0 +1,20 @@ +package main + +import ( + "os" + "os/signal" + "syscall" +) + +func watchSignals(onClose func()) <-chan struct{} { + // TODO: platform specific signals to watch for? + doneCh := make(chan struct{}) + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + <-signalCh + onClose() + close(doneCh) + }() + return doneCh +} diff --git a/cli/go.mod b/cli/go.mod index 545599ab6672a..e55fefaf8f3d8 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -16,7 +16,8 @@ require ( github.com/google/go-cmp v0.5.5 // indirect github.com/google/uuid v1.2.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-hclog v0.16.2 + github.com/hashicorp/go-gatedio v0.5.0 + github.com/hashicorp/go-hclog v1.1.0 github.com/hashicorp/go-retryablehttp v0.6.8 github.com/karrick/godirwalk v1.16.1 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/cli/go.sum b/cli/go.sum index 89e075e36af7e..0e71de3b692db 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -43,9 +43,13 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-gatedio v0.5.0 h1:Jm1X5yP4yCqqWj5L1TgW7iZwCVPGtVc+mro5r/XX7Tg= +github.com/hashicorp/go-gatedio v0.5.0/go.mod h1:Lr3t8L6IyxD3DAeaUxGcgl2JnRUpWMCsmBl4Omu/2t4= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v0.16.2 h1:K4ev2ib4LdQETX5cSZBG0DVLk1jwGqSPXBjdah3veNs= github.com/hashicorp/go-hclog v0.16.2/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= +github.com/hashicorp/go-hclog v1.1.0 h1:QsGcniKx5/LuX2eYoeL+Np3UKYPNaN7YKpTh29h8rbw= +github.com/hashicorp/go-hclog v1.1.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= diff --git a/cli/internal/process/child.go b/cli/internal/process/child.go new file mode 100644 index 0000000000000..c239ef4cdcfb9 --- /dev/null +++ b/cli/internal/process/child.go @@ -0,0 +1,405 @@ +package process + +/** + * Code in this file is based on the source code at + * https://github.com/hashicorp/consul-template/tree/3ea7d99ad8eff17897e0d63dac86d74770170bb8/child/child.go + * + * Major changes include removing the ability to restart a child process, + * requiring a fully-formed exec.Cmd to be passed in, and including cmd.Dir + * in the description of a child process. + */ + +import ( + "errors" + "fmt" + "math/rand" + "os" + "os/exec" + "path" + "strings" + "sync" + "syscall" + "time" + + "github.com/hashicorp/go-hclog" +) + +func init() { + // Seed the default rand Source with current time to produce better random + // numbers used with splay + rand.Seed(time.Now().UnixNano()) +} + +var ( + // ErrMissingCommand is the error returned when no command is specified + // to run. + ErrMissingCommand = errors.New("missing command") + + // ExitCodeOK is the default OK exit code. + ExitCodeOK = 0 + + // ExitCodeError is the default error code returned when the child exits with + // an error without a more specific code. + ExitCodeError = 127 +) + +// Child is a wrapper around a child process which can be used to send signals +// and manage the processes' lifecycle. +type Child struct { + sync.RWMutex + + timeout time.Duration + + killSignal os.Signal + killTimeout time.Duration + + splay time.Duration + + // cmd is the actual child process under management. + cmd *exec.Cmd + + // exitCh is the channel where the processes exit will be returned. + exitCh chan int + + // stopLock is the mutex to lock when stopping. stopCh is the circuit breaker + // to force-terminate any waiting splays to kill the process now. stopped is + // a boolean that tells us if we have previously been stopped. + stopLock sync.RWMutex + stopCh chan struct{} + stopped bool + + // whether to set process group id or not (default on) + setpgid bool + + Label string + + logger hclog.Logger +} + +// NewInput is input to the NewChild function. +type NewInput struct { + // Cmd is the unstarted, preconfigured command to run + Cmd *exec.Cmd + + // Timeout is the maximum amount of time to allow the command to execute. If + // set to 0, the command is permitted to run infinitely. + Timeout time.Duration + + // KillSignal is the signal to send to gracefully kill this process. This + // value may be nil. + KillSignal os.Signal + + // KillTimeout is the amount of time to wait for the process to gracefully + // terminate before force-killing. + KillTimeout time.Duration + + // Splay is the maximum random amount of time to wait before sending signals. + // This option helps reduce the thundering herd problem by effectively + // sleeping for a random amount of time before sending the signal. This + // prevents multiple processes from all signaling at the same time. This value + // may be zero (which disables the splay entirely). + Splay time.Duration + + // Logger receives debug log lines about the process state and transitions + Logger hclog.Logger +} + +// New creates a new child process for management with high-level APIs for +// sending signals to the child process, restarting the child process, and +// gracefully terminating the child process. +func newChild(i NewInput) (*Child, error) { + label := fmt.Sprintf("(%v) %v %v", i.Cmd.Dir, path.Base(i.Cmd.Path), strings.Join(i.Cmd.Args, " ")) + child := &Child{ + cmd: i.Cmd, + timeout: i.Timeout, + killSignal: i.KillSignal, + killTimeout: i.KillTimeout, + splay: i.Splay, + stopCh: make(chan struct{}, 1), + setpgid: true, + Label: label, + logger: i.Logger.Named(label), + } + + return child, nil +} + +// ExitCh returns the current exit channel for this child process. This channel +// may change if the process is restarted, so implementers must not cache this +// value. +func (c *Child) ExitCh() <-chan int { + c.RLock() + defer c.RUnlock() + return c.exitCh +} + +// Pid returns the pid of the child process. If no child process exists, 0 is +// returned. +func (c *Child) Pid() int { + c.RLock() + defer c.RUnlock() + return c.pid() +} + +// Command returns the human-formatted command with arguments. +func (c *Child) Command() string { + return c.Label +} + +// Start starts and begins execution of the child process. A buffered channel +// is returned which is where the command's exit code will be returned upon +// exit. Any errors that occur prior to starting the command will be returned +// as the second error argument, but any errors returned by the command after +// execution will be returned as a non-zero value over the exit code channel. +func (c *Child) Start() error { + // log.Printf("[INFO] (child) spawning: %s", c.Command()) + c.Lock() + defer c.Unlock() + return c.start() +} + +// Signal sends the signal to the child process, returning any errors that +// occur. +func (c *Child) Signal(s os.Signal) error { + c.logger.Debug("receiving signal %q", s.String()) + c.RLock() + defer c.RUnlock() + return c.signal(s) +} + +// Kill sends the kill signal to the child process and waits for successful +// termination. If no kill signal is defined, the process is killed with the +// most aggressive kill signal. If the process does not gracefully stop within +// the provided KillTimeout, the process is force-killed. If a splay was +// provided, this function will sleep for a random period of time between 0 and +// the provided splay value to reduce the thundering herd problem. This function +// does not return any errors because it guarantees the process will be dead by +// the return of the function call. +func (c *Child) Kill() { + c.logger.Debug("killing process") + c.Lock() + defer c.Unlock() + c.kill(false) +} + +// Stop behaves almost identical to Kill except it suppresses future processes +// from being started by this child and it prevents the killing of the child +// process from sending its value back up the exit channel. This is useful +// when doing a graceful shutdown of an application. +func (c *Child) Stop() { + c.internalStop(false) +} + +// StopImmediately behaves almost identical to Stop except it does not wait +// for any random splay if configured. This is used for performing a fast +// shutdown of consul-template and its children when a kill signal is received. +func (c *Child) StopImmediately() { + c.internalStop(true) +} + +func (c *Child) internalStop(immediately bool) { + c.Lock() + defer c.Unlock() + + c.stopLock.Lock() + defer c.stopLock.Unlock() + if c.stopped { + return + } + c.kill(immediately) + close(c.stopCh) + c.stopped = true +} + +func (c *Child) start() error { + setSetpgid(c.cmd, c.setpgid) + if err := c.cmd.Start(); err != nil { + return err + } + + // Create a new exitCh so that previously invoked commands (if any) don't + // cause us to exit, and start a goroutine to wait for that process to end. + exitCh := make(chan int, 1) + go func() { + var code int + // It's possible that kill is called before we even + // manage to get here. Make sure we still have a valid + // cmd before waiting on it. + c.RLock() + var cmd = c.cmd + c.RUnlock() + var err error + if cmd != nil { + err = cmd.Wait() + } + if err == nil { + code = ExitCodeOK + } else { + code = ExitCodeError + if exiterr, ok := err.(*exec.ExitError); ok { + if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { + code = status.ExitStatus() + } + } + } + + // If the child is in the process of killing, do not send a response back + // down the exit channel. + c.stopLock.RLock() + defer c.stopLock.RUnlock() + if !c.stopped { + select { + case <-c.stopCh: + case exitCh <- code: + } + } + + close(exitCh) + }() + + c.exitCh = exitCh + + // If a timeout was given, start the timer to wait for the child to exit + if c.timeout != 0 { + select { + case code := <-exitCh: + if code != 0 { + return fmt.Errorf( + "command exited with a non-zero exit status:\n"+ + "\n"+ + " %s\n"+ + "\n"+ + "This is assumed to be a failure. Please ensure the command\n"+ + "exits with a zero exit status.", + c.Command(), + ) + } + case <-time.After(c.timeout): + // Force-kill the process + c.stopLock.Lock() + defer c.stopLock.Unlock() + if c.cmd != nil && c.cmd.Process != nil { + c.cmd.Process.Kill() + } + + return fmt.Errorf( + "command did not exit within %q:\n"+ + "\n"+ + " %s\n"+ + "\n"+ + "Commands must exit in a timely manner in order for processing to\n"+ + "continue. Consider using a process supervisor or utilizing the\n"+ + "built-in exec mode instead.", + c.timeout, + c.Command(), + ) + } + } + + return nil +} + +func (c *Child) pid() int { + if !c.running() { + return 0 + } + return c.cmd.Process.Pid +} + +func (c *Child) signal(s os.Signal) error { + if !c.running() { + return nil + } + + sig, ok := s.(syscall.Signal) + if !ok { + return fmt.Errorf("bad signal: %s", s) + } + pid := c.cmd.Process.Pid + if c.setpgid { + // kill takes negative pid to indicate that you want to use gpid + pid = -(pid) + } + // cross platform way to signal process/process group + p, err := os.FindProcess(pid) + if err != nil { + return err + } + return p.Signal(sig) +} + +// kill sends the signal to kill the process using the configured signal +// if set, else the default system signal +func (c *Child) kill(immediately bool) { + + if !c.running() { + c.logger.Debug("Kill() called but process dead; not waiting for splay.") + return + } else if immediately { + c.logger.Debug("Kill() called but performing immediate shutdown; not waiting for splay.") + } else { + c.logger.Debug("Kill(%v) called", immediately) + select { + case <-c.stopCh: + case <-c.randomSplay(): + } + } + + var exited bool + defer func() { + if !exited { + c.logger.Debug("PKill") + c.cmd.Process.Kill() + } + c.cmd = nil + }() + + if c.killSignal == nil { + return + } + + if err := c.signal(c.killSignal); err != nil { + c.logger.Debug("Kill failed: %s", err) + if processNotFoundErr(err) { + exited = true // checked in defer + } + return + } + + killCh := make(chan struct{}, 1) + go func() { + defer close(killCh) + c.cmd.Process.Wait() + }() + + select { + case <-c.stopCh: + case <-killCh: + exited = true + case <-time.After(c.killTimeout): + c.logger.Debug("timeout") + } +} + +func (c *Child) running() bool { + select { + case <-c.exitCh: + return false + default: + } + return c.cmd != nil && c.cmd.Process != nil +} + +func (c *Child) randomSplay() <-chan time.Time { + if c.splay == 0 { + return time.After(0) + } + + ns := c.splay.Nanoseconds() + offset := rand.Int63n(ns) + t := time.Duration(offset) + + c.logger.Debug("waiting %.2fs for random splay", t.Seconds()) + + return time.After(t) +} diff --git a/cli/internal/process/child_nix_test.go b/cli/internal/process/child_nix_test.go new file mode 100644 index 0000000000000..7311d187138a3 --- /dev/null +++ b/cli/internal/process/child_nix_test.go @@ -0,0 +1,190 @@ +//go:build !windows +// +build !windows + +package process + +/** + * Code in this file is based on the source code at + * https://github.com/hashicorp/consul-template/tree/3ea7d99ad8eff17897e0d63dac86d74770170bb8/child/child_test.go + * + * Tests in this file use signals or pgid features not available on windows + */ + +import ( + "os/exec" + "syscall" + "testing" + "time" + + "github.com/hashicorp/go-gatedio" +) + +func TestSignal(t *testing.T) { + + c := testChild(t) + cmd := exec.Command("sh", "-c", "trap 'echo one; exit' USR1; while true; do sleep 0.2; done") + c.cmd = cmd + + out := gatedio.NewByteBuffer() + c.cmd.Stdout = out + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + // For some reason bash doesn't start immediately + time.Sleep(fileWaitSleepDelay) + + if err := c.Signal(syscall.SIGUSR1); err != nil { + t.Fatal(err) + } + + // Give time for the file to flush + time.Sleep(fileWaitSleepDelay) + + expected := "one\n" + if out.String() != expected { + t.Errorf("expected %q to be %q", out.String(), expected) + } +} + +func TestStop_childAlreadyDead(t *testing.T) { + c := testChild(t) + c.cmd = exec.Command("sh", "-c", "exit 1") + c.splay = 100 * time.Second + c.killSignal = syscall.SIGTERM + + if err := c.Start(); err != nil { + t.Fatal(err) + } + + // For some reason bash doesn't start immediately + time.Sleep(fileWaitSleepDelay) + + killStartTime := time.Now() + c.Stop() + killEndTime := time.Now() + + if killEndTime.Sub(killStartTime) > fileWaitSleepDelay { + t.Error("expected not to wait for splay") + } +} + +func TestSignal_noProcess(t *testing.T) { + + c := testChild(t) + if err := c.Signal(syscall.SIGUSR1); err != nil { + // Just assert there is no error + t.Fatal(err) + } +} + +func TestKill_signal(t *testing.T) { + + c := testChild(t) + cmd := exec.Command("sh", "-c", "trap 'echo one; exit' USR1; while true; do sleep 0.2; done") + c.killSignal = syscall.SIGUSR1 + + out := gatedio.NewByteBuffer() + cmd.Stdout = out + c.cmd = cmd + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + // For some reason bash doesn't start immediately + time.Sleep(fileWaitSleepDelay) + + c.Kill() + + // Give time for the file to flush + time.Sleep(fileWaitSleepDelay) + + expected := "one\n" + if out.String() != expected { + t.Errorf("expected %q to be %q", out.String(), expected) + } +} + +func TestKill_noProcess(t *testing.T) { + c := testChild(t) + c.killSignal = syscall.SIGUSR1 + c.Kill() +} + +func TestStop_noWaitForSplay(t *testing.T) { + c := testChild(t) + c.cmd = exec.Command("sh", "-c", "trap 'echo one; exit' USR1; while true; do sleep 0.2; done") + c.splay = 100 * time.Second + c.killSignal = syscall.SIGUSR1 + + out := gatedio.NewByteBuffer() + c.cmd.Stdout = out + + if err := c.Start(); err != nil { + t.Fatal(err) + } + + // For some reason bash doesn't start immediately + time.Sleep(fileWaitSleepDelay) + + killStartTime := time.Now() + c.StopImmediately() + killEndTime := time.Now() + + expected := "one\n" + if out.String() != expected { + t.Errorf("expected %q to be %q", out.String(), expected) + } + + if killEndTime.Sub(killStartTime) > fileWaitSleepDelay { + t.Error("expected not to wait for splay") + } +} + +func TestSetpgid(t *testing.T) { + t.Run("true", func(t *testing.T) { + c := testChild(t) + c.cmd = exec.Command("sh", "-c", "while true; do sleep 0.2; done") + // default, but to be explicit for the test + c.setpgid = true + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + // when setpgid is true, the pid and gpid should be the same + gpid, err := syscall.Getpgid(c.Pid()) + if err != nil { + t.Fatal("Getpgid error:", err) + } + + if c.Pid() != gpid { + t.Fatal("pid and gpid should match") + } + }) + t.Run("false", func(t *testing.T) { + c := testChild(t) + c.cmd = exec.Command("sh", "-c", "while true; do sleep 0.2; done") + c.setpgid = false + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + // when setpgid is true, the pid and gpid should be the same + gpid, err := syscall.Getpgid(c.Pid()) + if err != nil { + t.Fatal("Getpgid error:", err) + } + + if c.Pid() == gpid { + t.Fatal("pid and gpid should NOT match") + } + }) +} diff --git a/cli/internal/process/child_test.go b/cli/internal/process/child_test.go new file mode 100644 index 0000000000000..63dee22c08554 --- /dev/null +++ b/cli/internal/process/child_test.go @@ -0,0 +1,193 @@ +package process + +/** + * Code in this file is based on the source code at + * https://github.com/hashicorp/consul-template/tree/3ea7d99ad8eff17897e0d63dac86d74770170bb8/child/child_test.go + * + * Major changes include supporting api changes in child.go and removing + * tests for reloading, which was removed in child.go + */ + +import ( + "io/ioutil" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/hashicorp/go-gatedio" + "github.com/hashicorp/go-hclog" +) + +const fileWaitSleepDelay = 150 * time.Millisecond + +func testChild(t *testing.T) *Child { + cmd := exec.Command("echo", "hello", "world") + cmd.Stdout = ioutil.Discard + cmd.Stderr = ioutil.Discard + c, err := newChild(NewInput{ + Cmd: cmd, + KillSignal: os.Kill, + KillTimeout: 2 * time.Second, + Splay: 0 * time.Second, + Logger: hclog.Default(), + }) + if err != nil { + t.Fatal(err) + } + return c +} + +func TestNew(t *testing.T) { + + stdin := gatedio.NewByteBuffer() + stdout := gatedio.NewByteBuffer() + stderr := gatedio.NewByteBuffer() + command := "echo" + args := []string{"hello", "world"} + env := []string{"a=b", "c=d"} + killSignal := os.Kill + killTimeout := fileWaitSleepDelay + splay := fileWaitSleepDelay + + cmd := exec.Command(command, args...) + cmd.Stdin = stdin + cmd.Stderr = stderr + cmd.Stdout = stdout + cmd.Env = env + c, err := newChild(NewInput{ + Cmd: cmd, + KillSignal: killSignal, + KillTimeout: killTimeout, + Splay: splay, + Logger: hclog.Default(), + }) + if err != nil { + t.Fatal(err) + } + + if c.killSignal != killSignal { + t.Errorf("expected %q to be %q", c.killSignal, killSignal) + } + + if c.killTimeout != killTimeout { + t.Errorf("expected %q to be %q", c.killTimeout, killTimeout) + } + + if c.splay != splay { + t.Errorf("expected %q to be %q", c.splay, splay) + } + + if c.stopCh == nil { + t.Errorf("expected %#v to be", c.stopCh) + } +} + +func TestExitCh_noProcess(t *testing.T) { + + c := testChild(t) + ch := c.ExitCh() + if ch != nil { + t.Errorf("expected %#v to be nil", ch) + } +} + +func TestExitCh(t *testing.T) { + + c := testChild(t) + if err := c.Start(); err != nil { + t.Fatal(err) + } + println("Started") + defer c.Stop() + + ch := c.ExitCh() + if ch == nil { + t.Error("expected ch to exist") + } +} + +func TestPid_noProcess(t *testing.T) { + + c := testChild(t) + pid := c.Pid() + if pid != 0 { + t.Errorf("expected %q to be 0", pid) + } +} + +func TestPid(t *testing.T) { + + c := testChild(t) + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + pid := c.Pid() + if pid == 0 { + t.Error("expected pid to not be 0") + } +} + +func TestStart(t *testing.T) { + + c := testChild(t) + + // Set our own reader and writer so we can verify they are wired to the child. + stdin := gatedio.NewByteBuffer() + stdout := gatedio.NewByteBuffer() + stderr := gatedio.NewByteBuffer() + // Custom env and command + env := []string{"a=b", "c=d"} + cmd := exec.Command("env") + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr + cmd.Env = env + c.cmd = cmd + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + select { + case <-c.ExitCh(): + case <-time.After(fileWaitSleepDelay): + t.Fatal("process should have exited") + } + + output := stdout.String() + for _, envVar := range env { + if !strings.Contains(output, envVar) { + t.Errorf("expected to find %q in %q", envVar, output) + } + } +} + +func TestKill_noSignal(t *testing.T) { + + c := testChild(t) + c.cmd = exec.Command("sh", "-c", "while true; do sleep 0.2; done") + c.killTimeout = 20 * time.Millisecond + c.killSignal = nil + + if err := c.Start(); err != nil { + t.Fatal(err) + } + defer c.Stop() + + // For some reason bash doesn't start immediately + time.Sleep(fileWaitSleepDelay) + + c.Kill() + + // Give time for the file to flush + time.Sleep(fileWaitSleepDelay) + + if c.cmd != nil { + t.Errorf("expected cmd to be nil") + } +} diff --git a/cli/internal/process/manager.go b/cli/internal/process/manager.go new file mode 100644 index 0000000000000..0488a2980a0ec --- /dev/null +++ b/cli/internal/process/manager.go @@ -0,0 +1,120 @@ +package process + +import ( + "errors" + "fmt" + "os" + "os/exec" + "sync" + "time" + + "github.com/hashicorp/go-hclog" +) + +// ErrClosing is returned when the process manager is in the process of closing, +// meaning that no more child processes can be Exec'd, and existing, non-failed +// child processes will be stopped with this error. +var ErrClosing = errors.New("process manager is already closing") + +// ChildExit is returned when a child process exits with a non-zero exit code +type ChildExit struct { + ExitCode int + Command string +} + +func (ce *ChildExit) Error() string { + return fmt.Sprintf("command %s exited (%d)", ce.Command, ce.ExitCode) +} + +// Manager tracks all of the child processes that have been spawned +type Manager struct { + done bool + children map[*Child]struct{} + mu sync.Mutex + doneCh chan struct{} + logger hclog.Logger +} + +// NewManager creates a new properly-initialized Manager instance +func NewManager(logger hclog.Logger) *Manager { + return &Manager{ + children: make(map[*Child]struct{}), + doneCh: make(chan struct{}), + logger: logger, + } +} + +// Exec spawns a child process to run the given command, then blocks +// until it completes. Returns a nil error if the child process finished +// successfully, ErrClosing if the manager closed during execution, and +// a ChildExit error if the child process exited with a non-zero exit code. +func (m *Manager) Exec(cmd *exec.Cmd) error { + m.mu.Lock() + if m.done { + m.mu.Unlock() + return ErrClosing + } + + child, err := newChild(NewInput{ + Cmd: cmd, + // Run forever by default + Timeout: 0, + // When it's time to exit, give a 10 second timeout + KillTimeout: 10 * time.Second, + // Send SIGINT to stop children + KillSignal: os.Interrupt, + Logger: m.logger, + }) + if err != nil { + return err + } + + m.children[child] = struct{}{} + m.mu.Unlock() + err = child.Start() + if err != nil { + m.mu.Lock() + delete(m.children, child) + m.mu.Unlock() + return err + } + err = nil + exitCode, ok := <-child.ExitCh() + if !ok { + err = ErrClosing + } else if exitCode != ExitCodeOK { + err = &ChildExit{ + ExitCode: exitCode, + Command: child.Command(), + } + } + + m.mu.Lock() + delete(m.children, child) + m.mu.Unlock() + return err +} + +// Close sends SIGINT to all child processes if it hasn't been done yet, +// and in either case blocks until they all exit or timeout +func (m *Manager) Close() { + m.mu.Lock() + if m.done { + m.mu.Unlock() + <-m.doneCh + return + } + wg := sync.WaitGroup{} + m.done = true + for child := range m.children { + child := child + wg.Add(1) + go func() { + child.Stop() + wg.Done() + }() + } + m.mu.Unlock() + wg.Wait() + close(m.doneCh) +} diff --git a/cli/internal/process/manager_test.go b/cli/internal/process/manager_test.go new file mode 100644 index 0000000000000..172aed4246d6b --- /dev/null +++ b/cli/internal/process/manager_test.go @@ -0,0 +1,125 @@ +package process + +import ( + "errors" + "os/exec" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-gatedio" + "github.com/hashicorp/go-hclog" +) + +func newManager() *Manager { + return NewManager(hclog.Default()) +} + +func TestExec_simple(t *testing.T) { + mgr := newManager() + + out := gatedio.NewByteBuffer() + cmd := exec.Command("env") + cmd.Stdout = out + + err := mgr.Exec(cmd) + if err != nil { + t.Errorf("expected %q to be nil", err) + } + + output := out.String() + if output == "" { + t.Error("expected output from running 'env', got empty string") + } +} + +func TestExec_multiple(t *testing.T) { + mgr := newManager() + + wg := sync.WaitGroup{} + tasks := 4 + errors := make([]error, tasks) + start := time.Now() + for i := 0; i < tasks; i++ { + wg.Add(1) + go func(index int) { + cmd := exec.Command("sleep", "0.15") + err := mgr.Exec(cmd) + if err != nil { + errors[index] = err + } + wg.Done() + }(i) + } + wg.Wait() + end := time.Now() + duration := end.Sub(start) + if duration > 300*time.Millisecond { + t.Errorf("expected parallel execution, total time was %q", duration) + } + for _, err := range errors { + if err != nil { + t.Errorf("expected no errors, found %q", err) + } + } +} + +func TestClose(t *testing.T) { + mgr := newManager() + + wg := sync.WaitGroup{} + tasks := 4 + errors := make([]error, tasks) + start := time.Now() + for i := 0; i < tasks; i++ { + wg.Add(1) + go func(index int) { + cmd := exec.Command("sleep", "0.5") + err := mgr.Exec(cmd) + if err != nil { + errors[index] = err + } + wg.Done() + }(i) + } + // let processes kick off + time.Sleep(50 * time.Millisecond) + mgr.Close() + end := time.Now() + wg.Wait() + duration := end.Sub(start) + if duration >= 500*time.Millisecond { + t.Errorf("expected to close, total time was %q", duration) + } + for _, err := range errors { + if err != ErrClosing { + t.Errorf("expected manager closing error, found %q", err) + } + } +} + +func TestClose_alreadyClosed(t *testing.T) { + mgr := newManager() + mgr.Close() + + // repeated closing does not error + mgr.Close() + + err := mgr.Exec(exec.Command("sleep", "1")) + if err != ErrClosing { + t.Errorf("expected manager closing error, found %q", err) + } +} + +func TestExitCode(t *testing.T) { + mgr := newManager() + + err := mgr.Exec(exec.Command("ls", "doesnotexist")) + exitErr := &ChildExit{} + if !errors.As(err, &exitErr) { + t.Errorf("expected a ChildExit err, got %q", err) + } + if exitErr.ExitCode == 0 { + t.Error("expected non-zero exit code , got 0") + } +} diff --git a/cli/internal/process/sys_nix.go b/cli/internal/process/sys_nix.go new file mode 100644 index 0000000000000..0e6c003f4bc6b --- /dev/null +++ b/cli/internal/process/sys_nix.go @@ -0,0 +1,23 @@ +//go:build !windows +// +build !windows + +package process + +/** + * Code in this file is based on the source code at + * https://github.com/hashicorp/consul-template/tree/3ea7d99ad8eff17897e0d63dac86d74770170bb8/child/sys_nix.go + */ + +import ( + "os/exec" + "syscall" +) + +func setSetpgid(cmd *exec.Cmd, value bool) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: value} +} + +func processNotFoundErr(err error) bool { + // ESRCH == no such process, ie. already exited + return err == syscall.ESRCH +} diff --git a/cli/internal/process/sys_windows.go b/cli/internal/process/sys_windows.go new file mode 100644 index 0000000000000..c626c22f19579 --- /dev/null +++ b/cli/internal/process/sys_windows.go @@ -0,0 +1,17 @@ +//go:build windows +// +build windows + +package process + +/** + * Code in this file is based on the source code at + * https://github.com/hashicorp/consul-template/tree/3ea7d99ad8eff17897e0d63dac86d74770170bb8/child/sys_windows.go + */ + +import "os/exec" + +func setSetpgid(cmd *exec.Cmd, value bool) {} + +func processNotFoundErr(err error) bool { + return false +} diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index a3b681ba8a4c7..14a96bf6fd28d 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -21,6 +21,7 @@ import ( "turbo/internal/fs" "turbo/internal/globby" "turbo/internal/logstreamer" + "turbo/internal/process" "turbo/internal/scm" "turbo/internal/ui" "turbo/internal/util" @@ -40,8 +41,9 @@ const ENV_PIPELINE_DELMITER = "$" // RunCommand is a Command implementation that tells Turbo to run a task type RunCommand struct { - Config *config.Config - Ui *cli.ColoredUi + Config *config.Config + Ui *cli.ColoredUi + Processes *process.Manager } // Synopsis of run command @@ -547,13 +549,17 @@ func (c *RunCommand) Run(args []string) int { logStreamerOut.FlushRecord() // Run the command - if err := cmd.Run(); err != nil { + if err := c.Processes.Exec(cmd); err != nil { + // if we already know we're in the process of exiting, + // we don't need to record an error to that effect. + if errors.Is(err, process.ErrClosing) { + return nil + } tracer(TargetBuildFailed, err) targetLogger.Error("Error: command finished with error: %w", err) if runOptions.bail { if runOptions.stream { targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) - os.Exit(1) } else { f, err := os.Open(filepath.Join(runOptions.cwd, logFileName)) if err != nil { @@ -567,15 +573,14 @@ func (c *RunCommand) Run(args []string) int { for scan.Scan() { c.Ui.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pack.Name, task, scan.Bytes())) //Writing to Stdout } - os.Exit(1) } + c.Processes.Close() } else { if runOptions.stream { targetUi.Warn("command finished with error, but continuing...") } } - - return nil + return err } // Cache command outputs