From d03b98c725bc613815005b6f93e96bd8036ae671 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 26 Jul 2024 15:46:28 -0500 Subject: [PATCH 1/7] fix(watch): exit command if persistent task exits --- crates/turborepo-lib/src/run/watch.rs | 50 ++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 8376dbe734488..3683456c7dad2 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -60,6 +60,7 @@ pub struct WatchClient { struct RunHandle { stopper: run::RunStopper, run_task: JoinHandle>, + persistent_exit: Option>, } #[derive(Debug, Error, Diagnostic)] @@ -109,6 +110,8 @@ pub enum Error { Config(#[from] crate::config::Error), #[error(transparent)] SignalListener(#[from] turborepo_signals::listeners::Error), + #[error("persistent tasks exited unexpectedly")] + PersistentExit, } impl WatchClient { @@ -195,8 +198,22 @@ impl WatchClient { let run_fut = async { let mut run_handle: Option = None; + let mut persistent_exit = None; loop { - notify_run.notified().await; + if let Some(persistent) = &mut persistent_exit { + // here we watch both notify *and* persistent task + // if notify exits, then continue per usual + // if persist exits, then we break out of loop with a + select! { + _ = notify_run.notified() => {}, + _ = persistent => { + break; + } + } + } else { + notify_run.notified().await; + } + let some_changed_packages = { let mut changed_packages_guard = changed_packages.lock().expect("poisoned lock"); @@ -206,16 +223,22 @@ impl WatchClient { if let Some(changed_packages) = some_changed_packages { // Clean up currently running tasks - if let Some(RunHandle { stopper, run_task }) = run_handle.take() { + if let Some(RunHandle { stopper, run_task, persistent_exit }) = run_handle.take() { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to // finish to ensure all messages are flushed. let _ = run_task.await; + if let Some(persistent_exit) = persistent_exit { + let _ = persistent_exit.await; + } } - run_handle = Some(self.execute_run(changed_packages).await?); + let mut raw_run_handle = self.execute_run(changed_packages).await?; + persistent_exit = raw_run_handle.persistent_exit.take(); + run_handle = Some(raw_run_handle); } } + Err(Error::PersistentExit) }; select! { @@ -270,7 +293,7 @@ impl WatchClient { if let Some(sender) = &self.ui_sender { sender.stop().await; } - if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { + if let Some(RunHandle { stopper, run_task, .. }) = self.persistent_tasks_handle.take() { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to finish @@ -332,6 +355,7 @@ impl WatchClient { Ok(RunHandle { stopper: run.stopper(), run_task: tokio::spawn(async move { run.run(ui_sender, true).await }), + persistent_exit: None }) } ChangedPackages::All => { @@ -358,12 +382,15 @@ impl WatchClient { self.watched_packages = self.run.get_relevant_packages(); // Clean up currently running persistent tasks - if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() { + if let Some(RunHandle { stopper, run_task, persistent_exit }) = self.persistent_tasks_handle.take() { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to finish // to ensure all messages are flushed. let _ = run_task.await; + if let Some(persistent_exit) = persistent_exit { + let _ = persistent_exit.await; + } } if let Some(sender) = &self.ui_sender { let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph); @@ -381,11 +408,16 @@ impl WatchClient { let ui_sender = self.ui_sender.clone(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish + let (persist_guard, persist_exit) = tokio::sync::oneshot::channel::<()>(); self.persistent_tasks_handle = Some(RunHandle { stopper: persistent_run.stopper(), - run_task: tokio::spawn( - async move { persistent_run.run(ui_sender, true).await }, - ), + run_task: tokio::spawn(async move { + // We move the guard in here so we can determine if the persist tasks + // exit as it'll go out of scope and drop. + let _guard = persist_guard; + persistent_run.run(ui_sender, true).await + }), + persistent_exit: None, }); let non_persistent_run = self.run.create_run_for_interruptible_tasks(); @@ -395,6 +427,7 @@ impl WatchClient { run_task: tokio::spawn(async move { non_persistent_run.run(ui_sender, true).await }), + persistent_exit: Some(persist_exit), }) } else { let ui_sender = self.ui_sender.clone(); @@ -402,6 +435,7 @@ impl WatchClient { Ok(RunHandle { stopper: run.stopper(), run_task: tokio::spawn(async move { run.run(ui_sender, true).await }), + persistent_exit: None, }) } } From aea082366a59915929b70076c51f5085d0a12818 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 1 Aug 2024 14:04:35 -0400 Subject: [PATCH 2/7] chore(watch): add test for persistent exit --- .../failing_dev/apps/web/package.json | 6 +++++ .../fixtures/failing_dev/package.json | 6 +++++ .../fixtures/failing_dev/turbo.json | 9 +++++++ .../integration/tests/watch/persistent-exit.t | 26 +++++++++++++++++++ 4 files changed, 47 insertions(+) create mode 100644 turborepo-tests/integration/fixtures/failing_dev/apps/web/package.json create mode 100644 turborepo-tests/integration/fixtures/failing_dev/package.json create mode 100644 turborepo-tests/integration/fixtures/failing_dev/turbo.json create mode 100644 turborepo-tests/integration/tests/watch/persistent-exit.t diff --git a/turborepo-tests/integration/fixtures/failing_dev/apps/web/package.json b/turborepo-tests/integration/fixtures/failing_dev/apps/web/package.json new file mode 100644 index 0000000000000..83cdbfdd80243 --- /dev/null +++ b/turborepo-tests/integration/fixtures/failing_dev/apps/web/package.json @@ -0,0 +1,6 @@ +{ + "name": "web", + "scripts": { + "dev": "echo server crashed && exit 1" + } +} diff --git a/turborepo-tests/integration/fixtures/failing_dev/package.json b/turborepo-tests/integration/fixtures/failing_dev/package.json new file mode 100644 index 0000000000000..9557291c8197e --- /dev/null +++ b/turborepo-tests/integration/fixtures/failing_dev/package.json @@ -0,0 +1,6 @@ +{ + "name": "monorepo", + "workspaces": [ + "apps/**" + ] +} diff --git a/turborepo-tests/integration/fixtures/failing_dev/turbo.json b/turborepo-tests/integration/fixtures/failing_dev/turbo.json new file mode 100644 index 0000000000000..006b851169bfc --- /dev/null +++ b/turborepo-tests/integration/fixtures/failing_dev/turbo.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://turbo.build/schema.json", + "tasks": { + "dev": { + "persistent": true, + "cache": false + } + } +} diff --git a/turborepo-tests/integration/tests/watch/persistent-exit.t b/turborepo-tests/integration/tests/watch/persistent-exit.t new file mode 100644 index 0000000000000..2e016d7d548f3 --- /dev/null +++ b/turborepo-tests/integration/tests/watch/persistent-exit.t @@ -0,0 +1,26 @@ +Setup + $ . ${TESTDIR}/../../../helpers/setup_integration_test.sh failing_dev + +Turbo should exit after dev script fails + $ ${TURBO} watch dev + \xe2\x80\xa2 Packages in scope: web (esc) + \xe2\x80\xa2 Running dev in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + \xe2\x80\xa2 Packages in scope: web (esc) + \xe2\x80\xa2 Running dev in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + web:dev: cache bypass, force executing bfb830bdb7d49cb8 + web:dev: + web:dev: > dev + web:dev: > echo server crashed && exit 1 + web:dev: + web:dev: server crashed + web:dev: npm ERR! Lifecycle script `dev` failed with error: + web:dev: npm ERR! Error: command failed + web:dev: npm ERR! in workspace: web + web:dev: npm ERR! at location: .* (re) + web:dev: ERROR: command finished with error: command .*npm(?:\.cmd)? run dev exited \(1\) (re) + web#dev: command .*npm(?:\.cmd)? run dev exited \(1\) (re) + x persistent tasks exited unexpectedly + + [1] From b63fa32bf39541f3b12050490d900bc2c812430d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 12 Mar 2025 10:07:45 -0400 Subject: [PATCH 3/7] chore: no longer warn if cache is already being shut down --- crates/turborepo-lib/src/run/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 77f18f1a3054a..da0bed78b6cf8 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -360,8 +360,6 @@ impl Run { _ = fut => {} _ = interrupt => {tracing::debug!("received interrupt, exiting");} } - } else { - tracing::warn!("could not start shutdown, exiting"); } spinner.finish_and_clear(); }); From a5b888c8d3a99e21cb7b6e498820881c22b502c6 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 12 Mar 2025 10:19:04 -0400 Subject: [PATCH 4/7] attempt at getting passing tests --- crates/turborepo-lib/src/run/watch.rs | 21 +++++++++++++++---- .../integration/tests/watch/persistent-exit.t | 15 ++++++++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 3683456c7dad2..32fefbbdb3b93 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -223,7 +223,12 @@ impl WatchClient { if let Some(changed_packages) = some_changed_packages { // Clean up currently running tasks - if let Some(RunHandle { stopper, run_task, persistent_exit }) = run_handle.take() { + if let Some(RunHandle { + stopper, + run_task, + persistent_exit, + }) = run_handle.take() + { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to @@ -293,7 +298,10 @@ impl WatchClient { if let Some(sender) = &self.ui_sender { sender.stop().await; } - if let Some(RunHandle { stopper, run_task, .. }) = self.persistent_tasks_handle.take() { + if let Some(RunHandle { + stopper, run_task, .. + }) = self.persistent_tasks_handle.take() + { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to finish @@ -355,7 +363,7 @@ impl WatchClient { Ok(RunHandle { stopper: run.stopper(), run_task: tokio::spawn(async move { run.run(ui_sender, true).await }), - persistent_exit: None + persistent_exit: None, }) } ChangedPackages::All => { @@ -382,7 +390,12 @@ impl WatchClient { self.watched_packages = self.run.get_relevant_packages(); // Clean up currently running persistent tasks - if let Some(RunHandle { stopper, run_task, persistent_exit }) = self.persistent_tasks_handle.take() { + if let Some(RunHandle { + stopper, + run_task, + persistent_exit, + }) = self.persistent_tasks_handle.take() + { // Shut down the tasks for the run stopper.stop().await; // Run should exit shortly after we stop all child tasks, wait for it to finish diff --git a/turborepo-tests/integration/tests/watch/persistent-exit.t b/turborepo-tests/integration/tests/watch/persistent-exit.t index 2e016d7d548f3..a7ae8538b6ce5 100644 --- a/turborepo-tests/integration/tests/watch/persistent-exit.t +++ b/turborepo-tests/integration/tests/watch/persistent-exit.t @@ -6,9 +6,18 @@ Turbo should exit after dev script fails \xe2\x80\xa2 Packages in scope: web (esc) \xe2\x80\xa2 Running dev in 1 packages (esc) \xe2\x80\xa2 Remote caching disabled (esc) - \xe2\x80\xa2 Packages in scope: web (esc) - \xe2\x80\xa2 Running dev in 1 packages (esc) - \xe2\x80\xa2 Remote caching disabled (esc) + web:dev: cache bypass, force executing bfb830bdb7d49cb8 + web:dev: + web:dev: > dev + web:dev: > echo server crashed && exit 1 + web:dev: + web:dev: server crashed + web:dev: npm ERR! Lifecycle script `dev` failed with error: + web:dev: npm ERR! Error: command failed + web:dev: npm ERR! in workspace: web + web:dev: npm ERR! at location: .* (re) + web:dev: ERROR: command finished with error: command .*npm(?:\.cmd)? run dev exited \(1\) (re) + web#dev: command .*npm(?:\.cmd)? run dev exited \(1\) (re) web:dev: cache bypass, force executing bfb830bdb7d49cb8 web:dev: web:dev: > dev From 40785f4e78ff5d8a73ea65cb76fdb58df1815d09 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 12 Mar 2025 12:15:29 -0400 Subject: [PATCH 5/7] chore(watch): update test to match ci vs non-ci runs --- turborepo-tests/integration/tests/watch/persistent-exit.t | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/turborepo-tests/integration/tests/watch/persistent-exit.t b/turborepo-tests/integration/tests/watch/persistent-exit.t index a7ae8538b6ce5..ced498004d20c 100644 --- a/turborepo-tests/integration/tests/watch/persistent-exit.t +++ b/turborepo-tests/integration/tests/watch/persistent-exit.t @@ -2,7 +2,9 @@ Setup $ . ${TESTDIR}/../../../helpers/setup_integration_test.sh failing_dev Turbo should exit after dev script fails - $ ${TURBO} watch dev +Disabling daemon so this matches behavior when running test on CI + $ TURBO_DAEMON=0 ${TURBO} watch dev + WARNING daemon is required for watch, ignoring request to disable daemon \xe2\x80\xa2 Packages in scope: web (esc) \xe2\x80\xa2 Running dev in 1 packages (esc) \xe2\x80\xa2 Remote caching disabled (esc) From bc91f531fbd68edfaf2c06bce1395c0c20ed316a Mon Sep 17 00:00:00 2001 From: Anthony Shew Date: Thu, 26 Jun 2025 16:24:19 -0400 Subject: [PATCH 6/7] fix the test (#10585) I'm 0% certain that this is a structurally sound change - but it does fix the logs regression in https://github.com/vercel/turborepo/pull/8858. --- crates/turborepo-lib/src/run/watch.rs | 19 ++++++++++++++++++- .../integration/tests/watch/persistent-exit.t | 14 +------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 32fefbbdb3b93..7e4e84c9f6824 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -187,8 +187,24 @@ impl WatchClient { let notify_event = notify_run.clone(); let event_fut = async { + let mut first_rediscover = true; while let Some(event) = events.next().await { let event = event?; + + // Skip the first RediscoverPackages event which is sent immediately by the + // daemon when we connect. The file watcher will send the real + // one. + if first_rediscover { + if matches!( + event.event, + Some(proto::package_change_event::Event::RediscoverPackages(_)) + ) { + first_rediscover = false; + continue; + } + first_rediscover = false; + } + Self::handle_change_event(&changed_packages, event.event.unwrap())?; notify_event.notify_one(); } @@ -205,10 +221,11 @@ impl WatchClient { // if notify exits, then continue per usual // if persist exits, then we break out of loop with a select! { - _ = notify_run.notified() => {}, + biased; _ = persistent => { break; } + _ = notify_run.notified() => {}, } } else { notify_run.notified().await; diff --git a/turborepo-tests/integration/tests/watch/persistent-exit.t b/turborepo-tests/integration/tests/watch/persistent-exit.t index ced498004d20c..d6f25bdd95ca2 100644 --- a/turborepo-tests/integration/tests/watch/persistent-exit.t +++ b/turborepo-tests/integration/tests/watch/persistent-exit.t @@ -19,19 +19,7 @@ Disabling daemon so this matches behavior when running test on CI web:dev: npm ERR! in workspace: web web:dev: npm ERR! at location: .* (re) web:dev: ERROR: command finished with error: command .*npm(?:\.cmd)? run dev exited \(1\) (re) - web#dev: command .*npm(?:\.cmd)? run dev exited \(1\) (re) - web:dev: cache bypass, force executing bfb830bdb7d49cb8 - web:dev: - web:dev: > dev - web:dev: > echo server crashed && exit 1 - web:dev: - web:dev: server crashed - web:dev: npm ERR! Lifecycle script `dev` failed with error: - web:dev: npm ERR! Error: command failed - web:dev: npm ERR! in workspace: web - web:dev: npm ERR! at location: .* (re) - web:dev: ERROR: command finished with error: command .*npm(?:\.cmd)? run dev exited \(1\) (re) web#dev: command .*npm(?:\.cmd)? run dev exited \(1\) (re) x persistent tasks exited unexpectedly - [1] + [1] \ No newline at end of file From c12399a5154062a049ce484db2a311ff79b978aa Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 27 Jun 2025 09:56:13 -0400 Subject: [PATCH 7/7] add streaming for debug --- .github/workflows/turborepo-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/turborepo-test.yml b/.github/workflows/turborepo-test.yml index 97759474a201f..b0acbbfc48867 100644 --- a/.github/workflows/turborepo-test.yml +++ b/.github/workflows/turborepo-test.yml @@ -162,7 +162,7 @@ jobs: if [ -z "${RUSTC_WRAPPER}" ]; then unset RUSTC_WRAPPER fi - turbo run test --filter=turborepo-tests-integration --color --env-mode=strict --token=${{ secrets.TURBO_TOKEN }} --team=${{ vars.TURBO_TEAM }} + turbo run test --filter=turborepo-tests-integration --color --env-mode=strict --token=${{ secrets.TURBO_TOKEN }} --team=${{ vars.TURBO_TEAM }} --log-order=stream shell: bash env: SCCACHE_BUCKET: turborepo-sccache