这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/turborepo-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
if [ -z "${RUSTC_WRAPPER}" ]; then
unset RUSTC_WRAPPER
fi
turbo run test --filter=turborepo-tests-integration --color --env-mode=strict --token=${{ secrets.TURBO_TOKEN }} --team=${{ vars.TURBO_TEAM }}
turbo run test --filter=turborepo-tests-integration --color --env-mode=strict --token=${{ secrets.TURBO_TOKEN }} --team=${{ vars.TURBO_TEAM }} --log-order=stream
shell: bash
env:
SCCACHE_BUCKET: turborepo-sccache
Expand Down
2 changes: 0 additions & 2 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,6 @@ impl Run {
_ = fut => {}
_ = interrupt => {tracing::debug!("received interrupt, exiting");}
}
} else {
tracing::warn!("could not start shutdown, exiting");
}
spinner.finish_and_clear();
});
Expand Down
80 changes: 72 additions & 8 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct WatchClient {
struct RunHandle {
stopper: run::RunStopper,
run_task: JoinHandle<Result<i32, run::Error>>,
persistent_exit: Option<tokio::sync::oneshot::Receiver<()>>,
}

#[derive(Debug, Error, Diagnostic)]
Expand Down Expand Up @@ -109,6 +110,8 @@ pub enum Error {
Config(#[from] crate::config::Error),
#[error(transparent)]
SignalListener(#[from] turborepo_signals::listeners::Error),
#[error("persistent tasks exited unexpectedly")]
PersistentExit,
}

impl WatchClient {
Expand Down Expand Up @@ -184,8 +187,24 @@ impl WatchClient {
let notify_event = notify_run.clone();

let event_fut = async {
let mut first_rediscover = true;
while let Some(event) = events.next().await {
let event = event?;

// Skip the first RediscoverPackages event which is sent immediately by the
// daemon when we connect. The file watcher will send the real
// one.
if first_rediscover {
if matches!(
event.event,
Some(proto::package_change_event::Event::RediscoverPackages(_))
) {
first_rediscover = false;
continue;
}
first_rediscover = false;
}

Self::handle_change_event(&changed_packages, event.event.unwrap())?;
notify_event.notify_one();
}
Expand All @@ -195,8 +214,23 @@ impl WatchClient {

let run_fut = async {
let mut run_handle: Option<RunHandle> = None;
let mut persistent_exit = None;
loop {
notify_run.notified().await;
if let Some(persistent) = &mut persistent_exit {
// here we watch both notify *and* persistent task
// if notify exits, then continue per usual
// if persist exits, then we break out of loop with a
select! {
biased;
_ = persistent => {
break;
}
_ = notify_run.notified() => {},
}
} else {
notify_run.notified().await;
}

let some_changed_packages = {
let mut changed_packages_guard =
changed_packages.lock().expect("poisoned lock");
Expand All @@ -206,16 +240,27 @@ impl WatchClient {

if let Some(changed_packages) = some_changed_packages {
// Clean up currently running tasks
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
if let Some(RunHandle {
stopper,
run_task,
persistent_exit,
}) = run_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to
// finish to ensure all messages are flushed.
let _ = run_task.await;
if let Some(persistent_exit) = persistent_exit {
let _ = persistent_exit.await;
}
}
run_handle = Some(self.execute_run(changed_packages).await?);
let mut raw_run_handle = self.execute_run(changed_packages).await?;
persistent_exit = raw_run_handle.persistent_exit.take();
run_handle = Some(raw_run_handle);
}
}
Err(Error::PersistentExit)
};

select! {
Expand Down Expand Up @@ -270,7 +315,10 @@ impl WatchClient {
if let Some(sender) = &self.ui_sender {
sender.stop().await;
}
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
if let Some(RunHandle {
stopper, run_task, ..
}) = self.persistent_tasks_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to finish
Expand Down Expand Up @@ -332,6 +380,7 @@ impl WatchClient {
Ok(RunHandle {
stopper: run.stopper(),
run_task: tokio::spawn(async move { run.run(ui_sender, true).await }),
persistent_exit: None,
})
}
ChangedPackages::All => {
Expand All @@ -358,12 +407,20 @@ impl WatchClient {
self.watched_packages = self.run.get_relevant_packages();

// Clean up currently running persistent tasks
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
if let Some(RunHandle {
stopper,
run_task,
persistent_exit,
}) = self.persistent_tasks_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to finish
// to ensure all messages are flushed.
let _ = run_task.await;
if let Some(persistent_exit) = persistent_exit {
let _ = persistent_exit.await;
}
}
if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
Expand All @@ -381,11 +438,16 @@ impl WatchClient {
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
let (persist_guard, persist_exit) = tokio::sync::oneshot::channel::<()>();
self.persistent_tasks_handle = Some(RunHandle {
stopper: persistent_run.stopper(),
run_task: tokio::spawn(
async move { persistent_run.run(ui_sender, true).await },
),
run_task: tokio::spawn(async move {
// We move the guard in here so we can determine if the persist tasks
// exit as it'll go out of scope and drop.
let _guard = persist_guard;
persistent_run.run(ui_sender, true).await
}),
persistent_exit: None,
});

let non_persistent_run = self.run.create_run_for_interruptible_tasks();
Expand All @@ -395,13 +457,15 @@ impl WatchClient {
run_task: tokio::spawn(async move {
non_persistent_run.run(ui_sender, true).await
}),
persistent_exit: Some(persist_exit),
})
} else {
let ui_sender = self.ui_sender.clone();
let run = self.run.clone();
Ok(RunHandle {
stopper: run.stopper(),
run_task: tokio::spawn(async move { run.run(ui_sender, true).await }),
persistent_exit: None,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "web",
"scripts": {
"dev": "echo server crashed && exit 1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "monorepo",
"workspaces": [
"apps/**"
]
}
9 changes: 9 additions & 0 deletions turborepo-tests/integration/fixtures/failing_dev/turbo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"$schema": "https://turbo.build/schema.json",
"tasks": {
"dev": {
"persistent": true,
"cache": false
}
}
}
25 changes: 25 additions & 0 deletions turborepo-tests/integration/tests/watch/persistent-exit.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Setup
$ . ${TESTDIR}/../../../helpers/setup_integration_test.sh failing_dev

Turbo should exit after dev script fails
Disabling daemon so this matches behavior when running test on CI
$ TURBO_DAEMON=0 ${TURBO} watch dev
WARNING daemon is required for watch, ignoring request to disable daemon
\xe2\x80\xa2 Packages in scope: web (esc)
\xe2\x80\xa2 Running dev in 1 packages (esc)
\xe2\x80\xa2 Remote caching disabled (esc)
web:dev: cache bypass, force executing bfb830bdb7d49cb8
web:dev:
web:dev: > dev
web:dev: > echo server crashed && exit 1
web:dev:
web:dev: server crashed
web:dev: npm ERR! Lifecycle script `dev` failed with error:
web:dev: npm ERR! Error: command failed
web:dev: npm ERR! in workspace: web
web:dev: npm ERR! at location: .* (re)
web:dev: ERROR: command finished with error: command .*npm(?:\.cmd)? run dev exited \(1\) (re)
web#dev: command .*npm(?:\.cmd)? run dev exited \(1\) (re)
x persistent tasks exited unexpectedly

[1]
Loading