From de049c142448564dc65db80a697bf3e05a885e28 Mon Sep 17 00:00:00 2001 From: johnpyp Date: Tue, 18 Nov 2025 15:53:57 -0500 Subject: [PATCH] fix: Add fine grained interruptible task restarts in watch mode --- Cargo.lock | 1 + crates/turborepo-lib/src/run/mod.rs | 4 ++ crates/turborepo-lib/src/run/watch.rs | 67 ++++++++++++++++--- .../src/task_graph/visitor/command.rs | 3 + .../src/task_graph/visitor/exec.rs | 20 +++++- crates/turborepo-process/Cargo.toml | 1 + crates/turborepo-process/src/child.rs | 34 ++++++++-- crates/turborepo-process/src/command.rs | 14 ++++ crates/turborepo-process/src/lib.rs | 44 ++++++++++++ 9 files changed, 171 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cc226fe90033..859943caa2d27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7119,6 +7119,7 @@ dependencies = [ "tracing", "tracing-test", "turbopath", + "turborepo-task-id", "windows-sys 0.59.0", ] diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index e5e61ce9123c7..0fcd594e475ae 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -753,4 +753,8 @@ impl RunStopper { pub async fn stop(&self) { self.manager.stop().await; } + + pub async fn stop_tasks_matching(&self, predicate: impl Fn(&turborepo_process::Child) -> bool) { + self.manager.stop_children_matching(predicate).await; + } } diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 6e689acf80f5b..7d5a99e80f07b 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -18,6 +18,7 @@ use crate::{ commands::CommandBase, config::resolve_turbo_config_path, daemon::{proto, DaemonConnectorError, DaemonError}, + engine::TaskNode, get_version, opts, run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run}, DaemonConnector, DaemonPaths, @@ -48,6 +49,7 @@ pub struct WatchClient { run: Arc, watched_packages: HashSet, persistent_tasks_handle: Option, + active_runs: Vec, connector: DaemonConnector, base: CommandBase, telemetry: CommandEventBuilder, @@ -164,6 +166,7 @@ impl WatchClient { telemetry, experimental_write_cache, persistent_tasks_handle: None, + active_runs: Vec::new(), ui_sender, ui_handle, }) @@ -195,7 +198,6 @@ impl WatchClient { }; let run_fut = async { - let mut run_handle: Option = None; loop { notify_run.notified().await; let some_changed_packages = { @@ -205,16 +207,23 @@ impl WatchClient { .then(|| std::mem::take(changed_packages_guard.deref_mut())) }; - if let Some(changed_packages) = some_changed_packages { + if let Some(mut changed_packages) = some_changed_packages { // Clean up currently running tasks - if let Some(RunHandle { stopper, run_task }) = run_handle.take() { - // Shut down the tasks for the run - stopper.stop().await; - // Run should exit shortly after we stop all child tasks, wait for it to - // finish to ensure all messages are flushed. - let _ = run_task.await; + self.active_runs.retain(|h| !h.run_task.is_finished()); + + match &mut changed_packages { + ChangedPackages::Some(pkgs) => { + self.stop_impacted_tasks(pkgs).await; + } + ChangedPackages::All => { + for handle in self.active_runs.drain(..) { + handle.stopper.stop().await; + let _ = handle.run_task.await; + } + } } - run_handle = Some(self.execute_run(changed_packages).await?); + let new_run = self.execute_run(changed_packages).await?; + self.active_runs.push(new_run); } } }; @@ -266,11 +275,51 @@ impl WatchClient { Ok(()) } + async fn stop_impacted_tasks(&self, pkgs: &mut HashSet) { + let engine = self.run.engine(); + let mut tasks_to_stop = HashSet::new(); + + for node in engine.tasks() { + if let TaskNode::Task(task_id) = node { + if pkgs.contains(&PackageName::from(task_id.package())) { + tasks_to_stop.insert(task_id.clone()); + + for dependent_node in engine.transitive_dependents(task_id) { + if let TaskNode::Task(dependent_id) = dependent_node { + tasks_to_stop.insert(dependent_id.clone()); + } + } + } + } + } + + let mut impacted_packages = HashSet::new(); + for task_id in &tasks_to_stop { + impacted_packages.insert(PackageName::from(task_id.package())); + } + + *pkgs = impacted_packages; + + for handle in &self.active_runs { + let tasks = tasks_to_stop.clone(); + handle + .stopper + .stop_tasks_matching(move |child| { + child.task_id().map_or(false, |id| tasks.contains(id)) + }) + .await; + } + } + /// Shut down any resources that run as part of watch. pub async fn shutdown(&mut self) { if let Some(sender) = &self.ui_sender { sender.stop().await; } + for handle in self.active_runs.drain(..) { + handle.stopper.stop().await; + let _ = handle.run_task.await; + } if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { // Shut down the tasks for the run stopper.stop().await; diff --git a/crates/turborepo-lib/src/task_graph/visitor/command.rs b/crates/turborepo-lib/src/task_graph/visitor/command.rs index 639f705e3a0f9..3214a22c073a3 100644 --- a/crates/turborepo-lib/src/task_graph/visitor/command.rs +++ b/crates/turborepo-lib/src/task_graph/visitor/command.rs @@ -134,6 +134,7 @@ impl<'a> CommandProvider for PackageGraphCommandProvider<'a> { // We clear the env before populating it with variables we expect cmd.env_clear(); cmd.envs(environment.iter()); + cmd.task_id(task_id.clone().into_owned()); // If the task has an associated proxy, then we indicate this to the underlying // task via an env var @@ -277,6 +278,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider< let program = which::which(package_manager.command())?; let mut cmd = Command::new(&program); cmd.current_dir(package_dir).args(args).open_stdin(); + cmd.task_id(task_id.clone().into_owned()); Some(cmd) } else if has_mfe_dependency { tracing::debug!( @@ -297,6 +299,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider< let program = package_dir.join_components(&["node_modules", ".bin", bin_name]); let mut cmd = Command::new(program.as_std_path()); cmd.current_dir(package_dir).args(args).open_stdin(); + cmd.task_id(task_id.clone().into_owned()); Some(cmd) } else { tracing::debug!("MicroFrontendProxyProvider::command - using Turborepo built-in proxy"); diff --git a/crates/turborepo-lib/src/task_graph/visitor/exec.rs b/crates/turborepo-lib/src/task_graph/visitor/exec.rs index 230dff346c275..5d22f5a3efb98 100644 --- a/crates/turborepo-lib/src/task_graph/visitor/exec.rs +++ b/crates/turborepo-lib/src/task_graph/visitor/exec.rs @@ -185,6 +185,8 @@ enum ExecOutcome { }, // Task didn't execute normally due to a shutdown being initiated by another task Shutdown, + // Task was stopped to be restarted + Restarted, } enum SuccessOutcome { @@ -259,6 +261,12 @@ impl ExecContext { // stopped if we think we're shutting down. self.manager.stop().await; } + Ok(ExecOutcome::Restarted) => { + tracker.cancel(); + // We need to stop dependent tasks because this task will be restarted + // in a new run. + callback.send(Err(StopExecution::DependentTasks)).ok(); + } Err(e) => { tracker.cancel(); callback.send(Err(StopExecution::AllTasks)).ok(); @@ -455,7 +463,17 @@ impl ExecContext { // Something else killed the child ChildExit::KilledExternal => Err(InternalError::ExternalKill), // The child was killed by turbo indicating a shutdown - ChildExit::Killed | ChildExit::Interrupted => Ok(ExecOutcome::Shutdown), + ChildExit::Killed | ChildExit::Interrupted => { + // We distinguish between a full shutdown and a restart based on whether the + // process manager is closing. If it is closing, it means we are shutting down + // the entire run. If it is not closing, it means we are restarting specific + // tasks. + if self.manager.is_closing() { + Ok(ExecOutcome::Shutdown) + } else { + Ok(ExecOutcome::Restarted) + } + } } } } diff --git a/crates/turborepo-process/Cargo.toml b/crates/turborepo-process/Cargo.toml index d4b39341cf6fa..4720b4ac4dd27 100644 --- a/crates/turborepo-process/Cargo.toml +++ b/crates/turborepo-process/Cargo.toml @@ -19,6 +19,7 @@ portable-pty = "0.8.1" tokio = { workspace = true, features = ["full", "time"] } tracing.workspace = true turbopath = { workspace = true } +turborepo-task-id = { workspace = true } [lints] workspace = true diff --git a/crates/turborepo-process/src/child.rs b/crates/turborepo-process/src/child.rs index 43a0da2693bc0..81c2264c77755 100644 --- a/crates/turborepo-process/src/child.rs +++ b/crates/turborepo-process/src/child.rs @@ -31,6 +31,7 @@ use tokio::{ sync::{mpsc, watch}, }; use tracing::{debug, trace}; +use turborepo_task_id::TaskId; use super::{Command, PtySize}; @@ -384,20 +385,33 @@ pub struct Child { } #[derive(Clone, Debug)] -pub struct ChildCommandChannel(mpsc::Sender); +pub struct ChildCommandChannel { + sender: mpsc::Sender, + task_id: Option>, +} impl ChildCommandChannel { - pub fn new() -> (Self, mpsc::Receiver) { + pub fn new(task_id: Option>) -> (Self, mpsc::Receiver) { let (tx, rx) = mpsc::channel(1); - (ChildCommandChannel(tx), rx) + ( + ChildCommandChannel { + sender: tx, + task_id, + }, + rx, + ) } pub async fn kill(&self) -> Result<(), mpsc::error::SendError> { - self.0.send(ChildCommand::Kill).await + self.sender.send(ChildCommand::Kill).await } pub async fn stop(&self) -> Result<(), mpsc::error::SendError> { - self.0.send(ChildCommand::Stop).await + self.sender.send(ChildCommand::Stop).await + } + + pub fn get_task_id(&self) -> Option<&TaskId<'static>> { + self.task_id.as_ref() } } @@ -416,6 +430,7 @@ impl Child { pty_size: Option, ) -> io::Result { let label = command.label(); + let task_id = command.get_task_id().cloned(); let SpawnResult { handle: mut child, io: ChildIO { stdin, output }, @@ -428,7 +443,7 @@ impl Child { let pid = child.pid(); - let (command_tx, mut command_rx) = ChildCommandChannel::new(); + let (command_tx, mut command_rx) = ChildCommandChannel::new(task_id); // we use a watch channel to communicate the exit code back to the // caller. we are interested in three cases: @@ -680,6 +695,11 @@ impl Child { pub fn label(&self) -> &str { &self.label } + + pub fn task_id(&self) -> Option<&TaskId<'static>> { + self.command_channel + .get_task_id() + } } // Adds a trailing newline if necessary to the buffer @@ -750,7 +770,7 @@ impl ChildStateManager { impl Child { // Helper method for checking if child is running fn is_running(&self) -> bool { - !self.command_channel.0.is_closed() + !self.command_channel.sender.is_closed() } } diff --git a/crates/turborepo-process/src/command.rs b/crates/turborepo-process/src/command.rs index a8814f8229871..eacc09c415eac 100644 --- a/crates/turborepo-process/src/command.rs +++ b/crates/turborepo-process/src/command.rs @@ -6,6 +6,7 @@ use std::{ use itertools::Itertools; use turbopath::AbsoluteSystemPathBuf; +use turborepo_task_id::TaskId; /// A command builder that can be used to build both regular /// child processes and ones spawned hooked up to a PTY @@ -17,6 +18,7 @@ pub struct Command { env: BTreeMap, open_stdin: bool, env_clear: bool, + task_id: Option>, } impl Command { @@ -29,6 +31,7 @@ impl Command { env: BTreeMap::new(), open_stdin: false, env_clear: false, + task_id: None, } } @@ -106,6 +109,15 @@ impl Command { pub fn program(&self) -> &OsStr { &self.program } + + pub fn task_id(&mut self, task_id: TaskId<'static>) -> &mut Self { + self.task_id = Some(task_id); + self + } + + pub fn get_task_id(&self) -> Option<&TaskId<'static>> { + self.task_id.as_ref() + } } impl From for tokio::process::Command { @@ -117,6 +129,7 @@ impl From for tokio::process::Command { env, open_stdin, env_clear, + task_id: _, } = value; let mut cmd = tokio::process::Command::new(program); @@ -149,6 +162,7 @@ impl From for portable_pty::CommandBuilder { cwd, env, env_clear, + task_id: _, .. } = value; let mut cmd = portable_pty::CommandBuilder::new(program); diff --git a/crates/turborepo-process/src/lib.rs b/crates/turborepo-process/src/lib.rs index e3c6a9678aa90..27588ec6eca63 100644 --- a/crates/turborepo-process/src/lib.rs +++ b/crates/turborepo-process/src/lib.rs @@ -130,6 +130,12 @@ impl ProcessManager { self.close(|mut c| async move { c.stop().await }).await } + /// Stop children that match the given predicate + pub async fn stop_children_matching(&self, predicate: impl Fn(&Child) -> bool) { + self.close_matching(|mut c| async move { c.stop().await }, predicate) + .await + } + /// Stop the process manager, waiting for all child processes to exit. /// /// If you want to set a timeout, use `tokio::time::timeout` and @@ -174,9 +180,47 @@ impl ProcessManager { } } + async fn close_matching(&self, callback: F, predicate: P) + where + F: Fn(Child) -> C + Sync + Send + Copy + 'static, + C: Future> + Sync + Send + 'static, + P: Fn(&Child) -> bool, + { + let mut set = JoinSet::new(); + + { + let mut lock = self.state.lock().expect("not poisoned"); + // We don't set is_closing = true here because we want to allow new spawns + + let matching: Vec<_> = lock + .children + .iter() + .filter(|c| predicate(c)) + .cloned() + .collect(); + + lock.children.retain(|c| !predicate(c)); + + for child in matching { + let child = child.clone(); + set.spawn(async move { callback(child).await }); + } + } + + debug!("waiting for {} processes to exit", set.len()); + + while let Some(out) = set.join_next().await { + trace!("process exited: {:?}", out); + } + } + pub fn set_pty_size(&self, rows: u16, cols: u16) { self.state.lock().expect("not poisoned").size = Some(PtySize { rows, cols }); } + + pub fn is_closing(&self) -> bool { + self.state.lock().expect("not poisoned").is_closing + } } impl ProcessManagerInner {