From 33add8b94e26753477663bebf47bf8f6f12d28b8 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 17 Feb 2025 12:14:31 -0500 Subject: [PATCH 1/9] chore(signals): move signal handler to own crate --- Cargo.lock | 9 +++++++++ Cargo.toml | 1 + crates/turborepo-lib/Cargo.toml | 1 + crates/turborepo-lib/src/cli/error.rs | 2 +- crates/turborepo-lib/src/commands/boundaries.rs | 2 +- crates/turborepo-lib/src/commands/ls.rs | 2 +- crates/turborepo-lib/src/commands/query.rs | 2 +- crates/turborepo-lib/src/commands/run.rs | 3 ++- crates/turborepo-lib/src/lib.rs | 1 - crates/turborepo-lib/src/query/mod.rs | 2 +- crates/turborepo-lib/src/run/builder.rs | 2 +- crates/turborepo-lib/src/run/mod.rs | 2 +- crates/turborepo-lib/src/run/watch.rs | 2 +- crates/turborepo-signals/Cargo.toml | 13 +++++++++++++ .../src/signal.rs => turborepo-signals/src/lib.rs} | 3 +++ 15 files changed, 37 insertions(+), 10 deletions(-) create mode 100644 crates/turborepo-signals/Cargo.toml rename crates/{turborepo-lib/src/signal.rs => turborepo-signals/src/lib.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index 9d8f0a2b84531..1c23746a6d49b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6613,6 +6613,7 @@ dependencies = [ "turborepo-microfrontends", "turborepo-repository", "turborepo-scm", + "turborepo-signals", "turborepo-telemetry", "turborepo-ui", "turborepo-unescape", @@ -6765,6 +6766,14 @@ dependencies = [ "which", ] +[[package]] +name = "turborepo-signals" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "turborepo-telemetry" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 574b9900db026..4583c7298422a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ turborepo-repository = { path = "crates/turborepo-repository" } turborepo-ui = { path = "crates/turborepo-ui" } turborepo-unescape = { path = "crates/turborepo-unescape" } turborepo-scm = { path = "crates/turborepo-scm" } +turborepo-signals = { path = "crates/turborepo-signals" } wax = { path = "crates/turborepo-wax" } turborepo-vercel-api = { path = "crates/turborepo-vercel-api" } turborepo-vercel-api-mock = { path = "crates/turborepo-vercel-api-mock" } diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index cfbfdb70d9345..ff87f6cd1a1ba 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -139,6 +139,7 @@ turborepo-lockfiles = { workspace = true } turborepo-microfrontends = { workspace = true } turborepo-repository = { path = "../turborepo-repository" } turborepo-scm = { workspace = true } +turborepo-signals = { workspace = true } turborepo-telemetry = { path = "../turborepo-telemetry" } turborepo-ui = { workspace = true } turborepo-unescape = { workspace = true } diff --git a/crates/turborepo-lib/src/cli/error.rs b/crates/turborepo-lib/src/cli/error.rs index 319a0461043bd..013a1a2124839 100644 --- a/crates/turborepo-lib/src/cli/error.rs +++ b/crates/turborepo-lib/src/cli/error.rs @@ -4,6 +4,7 @@ use itertools::Itertools; use miette::Diagnostic; use thiserror::Error; use turborepo_repository::package_graph; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, BOLD, GREY}; @@ -14,7 +15,6 @@ use crate::{ rewrite_json::RewriteError, run, run::{builder::RunBuilder, watch}, - signal::SignalHandler, }; #[derive(Debug, Error, Diagnostic)] diff --git a/crates/turborepo-lib/src/commands/boundaries.rs b/crates/turborepo-lib/src/commands/boundaries.rs index b5bf517290d1b..59e8a735bc685 100644 --- a/crates/turborepo-lib/src/commands/boundaries.rs +++ b/crates/turborepo-lib/src/commands/boundaries.rs @@ -1,10 +1,10 @@ +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{ cli, commands::{run::get_signal, CommandBase}, run::builder::RunBuilder, - signal::SignalHandler, }; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { diff --git a/crates/turborepo-lib/src/commands/ls.rs b/crates/turborepo-lib/src/commands/ls.rs index 9e05ac49438ad..5a5678730d036 100644 --- a/crates/turborepo-lib/src/commands/ls.rs +++ b/crates/turborepo-lib/src/commands/ls.rs @@ -5,6 +5,7 @@ use serde::Serialize; use thiserror::Error; use turbopath::AnchoredSystemPath; use turborepo_repository::package_graph::{PackageName, PackageNode}; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, cprint, cprintln, ColorConfig, BOLD, BOLD_GREEN, GREY}; @@ -13,7 +14,6 @@ use crate::{ cli::OutputFormat, commands::{run::get_signal, CommandBase}, run::{builder::RunBuilder, Run}, - signal::SignalHandler, }; #[derive(Debug, Error, Diagnostic)] diff --git a/crates/turborepo-lib/src/commands/query.rs b/crates/turborepo-lib/src/commands/query.rs index 80148bbd860b8..a3129cfc2b9c6 100644 --- a/crates/turborepo-lib/src/commands/query.rs +++ b/crates/turborepo-lib/src/commands/query.rs @@ -5,6 +5,7 @@ use camino::Utf8Path; use miette::{Diagnostic, Report, SourceSpan}; use thiserror::Error; use turbopath::AbsoluteSystemPathBuf; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{ @@ -12,7 +13,6 @@ use crate::{ query, query::{Error, RepositoryQuery}, run::builder::RunBuilder, - signal::SignalHandler, }; const SCHEMA_QUERY: &str = "query IntrospectionQuery { diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index b3b6e5dada7c9..9c034f6d05558 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -1,10 +1,11 @@ use std::{future::Future, sync::Arc}; use tracing::error; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; -use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler}; +use crate::{commands::CommandBase, run, run::builder::RunBuilder}; #[cfg(windows)] pub fn get_signal() -> Result>, run::Error> { diff --git a/crates/turborepo-lib/src/lib.rs b/crates/turborepo-lib/src/lib.rs index 17448af06168b..6e10258aebc0b 100644 --- a/crates/turborepo-lib/src/lib.rs +++ b/crates/turborepo-lib/src/lib.rs @@ -34,7 +34,6 @@ mod query; mod rewrite_json; mod run; mod shim; -mod signal; mod task_graph; mod task_hash; mod tracing; diff --git a/crates/turborepo-lib/src/query/mod.rs b/crates/turborepo-lib/src/query/mod.rs index c7e3278ab2efa..5d03350ee932e 100644 --- a/crates/turborepo-lib/src/query/mod.rs +++ b/crates/turborepo-lib/src/query/mod.rs @@ -24,12 +24,12 @@ use tokio::select; use turbo_trace::TraceError; use turbopath::AbsoluteSystemPathBuf; use turborepo_repository::{change_mapper::AllPackageChangeReason, package_graph::PackageName}; +use turborepo_signals::SignalHandler; use crate::{ get_version, query::{file::File, task::RepositoryTask}, run::{builder::RunBuilder, Run}, - signal::SignalHandler, }; #[derive(Error, Debug, miette::Diagnostic)] diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index f1773daa87785..ee3c0aa81dc58 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -20,6 +20,7 @@ use turborepo_repository::{ package_json::PackageJson, }; use turborepo_scm::SCM; +use turborepo_signals::{SignalHandler, SignalSubscriber}; use turborepo_telemetry::events::{ command::CommandEventBuilder, generic::{DaemonInitStatus, GenericEventBuilder}, @@ -46,7 +47,6 @@ use crate::{ process::ProcessManager, run::{scope, task_access::TaskAccess, task_id::TaskName, Error, Run, RunCache}, shim::TurboState, - signal::{SignalHandler, SignalSubscriber}, turbo_json::{TurboJson, TurboJsonLoader, UIMode}, DaemonConnector, }; diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index fadaa7abf17e8..70f97375e3b0d 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -31,6 +31,7 @@ use turborepo_ci::Vendor; use turborepo_env::EnvironmentVariableMap; use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode}; use turborepo_scm::SCM; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::generic::GenericEventBuilder; use turborepo_ui::{ cprint, cprintln, sender::UISender, tui, tui::TuiSender, wui::sender::WebUISender, ColorConfig, @@ -45,7 +46,6 @@ use crate::{ opts::Opts, process::ProcessManager, run::{global_hash::get_global_hash_inputs, summary::RunTracker, task_access::TaskAccess}, - signal::SignalHandler, task_graph::Visitor, task_hash::{get_external_deps_hash, get_internal_deps_hash, PackageInputsHashes}, turbo_json::{TurboJson, TurboJsonLoader, UIMode}, diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 8f4513272d6f2..ad99524db431a 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -10,6 +10,7 @@ use thiserror::Error; use tokio::{select, sync::Notify, task::JoinHandle}; use tracing::{instrument, trace, warn}; use turborepo_repository::package_graph::PackageName; +use turborepo_signals::SignalHandler; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; @@ -18,7 +19,6 @@ use crate::{ daemon::{proto, DaemonConnectorError, DaemonError}, get_version, opts, run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run}, - signal::SignalHandler, turbo_json::CONFIG_FILE, DaemonConnector, DaemonPaths, }; diff --git a/crates/turborepo-signals/Cargo.toml b/crates/turborepo-signals/Cargo.toml new file mode 100644 index 0000000000000..6b35bace856ff --- /dev/null +++ b/crates/turborepo-signals/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "turborepo-signals" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.30" +tokio = { workspace = true, features = ["full", "time"] } + +[dev-dependencies] + +[lints] +workspace = true diff --git a/crates/turborepo-lib/src/signal.rs b/crates/turborepo-signals/src/lib.rs similarity index 99% rename from crates/turborepo-lib/src/signal.rs rename to crates/turborepo-signals/src/lib.rs index 37cd735ef2fc2..555a98309c6b7 100644 --- a/crates/turborepo-lib/src/signal.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -1,3 +1,6 @@ +#![deny(clippy::all)] +#![feature(assert_matches)] + use std::{ fmt::Debug, future::Future, From 8a7335a7377c243915020ff72928d2c5702954cc Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 17 Feb 2025 12:17:38 -0500 Subject: [PATCH 2/9] chore(signals): fix lints, add crate comment --- crates/turborepo-signals/src/lib.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/turborepo-signals/src/lib.rs b/crates/turborepo-signals/src/lib.rs index 555a98309c6b7..210f55e0b39f7 100644 --- a/crates/turborepo-signals/src/lib.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -1,6 +1,8 @@ #![deny(clippy::all)] #![feature(assert_matches)] +//! A crate for registering listeners for a given signal + use std::{ fmt::Debug, future::Future, @@ -28,7 +30,9 @@ pub struct SignalSubscriber(oneshot::Receiver>); /// SubscriberGuard should be kept until a subscriber is done processing the /// signal -pub struct SubscriberGuard(oneshot::Sender<()>); +pub struct SubscriberGuard { + _guard: oneshot::Sender<()>, +} impl SignalHandler { /// Construct a new SignalHandler that will alert any subscribers when @@ -111,11 +115,11 @@ impl SignalHandler { impl SignalSubscriber { /// Wait until signal is received by the signal handler pub async fn listen(self) -> SubscriberGuard { - let callback = self + let _guard = self .0 .await .expect("signal handler worker thread exited without alerting subscribers"); - SubscriberGuard(callback) + SubscriberGuard { _guard } } } From 9381f222906c5651f9ab985237972ff18ffdbcc3 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 17 Feb 2025 13:50:10 -0500 Subject: [PATCH 3/9] chore(signals): move signal listeners to seperate module --- crates/turborepo-lib/src/cli/error.rs | 6 ++-- .../turborepo-lib/src/commands/boundaries.rs | 10 ++---- crates/turborepo-lib/src/commands/ls.rs | 6 ++-- crates/turborepo-lib/src/commands/query.rs | 4 +-- crates/turborepo-lib/src/commands/run.rs | 32 ++----------------- crates/turborepo-lib/src/run/mod.rs | 4 +-- crates/turborepo-lib/src/run/watch.rs | 6 ++-- crates/turborepo-signals/src/lib.rs | 2 ++ crates/turborepo-signals/src/listeners.rs | 29 +++++++++++++++++ 9 files changed, 50 insertions(+), 49 deletions(-) create mode 100644 crates/turborepo-signals/src/listeners.rs diff --git a/crates/turborepo-lib/src/cli/error.rs b/crates/turborepo-lib/src/cli/error.rs index 013a1a2124839..2407671e22d6a 100644 --- a/crates/turborepo-lib/src/cli/error.rs +++ b/crates/turborepo-lib/src/cli/error.rs @@ -4,12 +4,12 @@ use itertools::Itertools; use miette::Diagnostic; use thiserror::Error; use turborepo_repository::package_graph; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, BOLD, GREY}; use crate::{ - commands::{bin, generate, link, login, ls, prune, run::get_signal, CommandBase}, + commands::{bin, generate, link, login, ls, prune, CommandBase}, daemon::DaemonError, query, rewrite_json::RewriteError, @@ -78,7 +78,7 @@ pub async fn print_potential_tasks( base: CommandBase, telemetry: CommandEventBuilder, ) -> Result<(), Error> { - let signal = get_signal()?; + let signal = get_signal().map_err(run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let color_config = base.color_config; diff --git a/crates/turborepo-lib/src/commands/boundaries.rs b/crates/turborepo-lib/src/commands/boundaries.rs index 59e8a735bc685..03211975078be 100644 --- a/crates/turborepo-lib/src/commands/boundaries.rs +++ b/crates/turborepo-lib/src/commands/boundaries.rs @@ -1,14 +1,10 @@ -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; -use crate::{ - cli, - commands::{run::get_signal, CommandBase}, - run::builder::RunBuilder, -}; +use crate::{cli, commands::CommandBase, run::builder::RunBuilder}; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run = RunBuilder::new(base)? diff --git a/crates/turborepo-lib/src/commands/ls.rs b/crates/turborepo-lib/src/commands/ls.rs index 5a5678730d036..a1c94f6bd874d 100644 --- a/crates/turborepo-lib/src/commands/ls.rs +++ b/crates/turborepo-lib/src/commands/ls.rs @@ -5,14 +5,14 @@ use serde::Serialize; use thiserror::Error; use turbopath::AnchoredSystemPath; use turborepo_repository::package_graph::{PackageName, PackageNode}; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, cprint, cprintln, ColorConfig, BOLD, BOLD_GREEN, GREY}; use crate::{ cli, cli::OutputFormat, - commands::{run::get_signal, CommandBase}, + commands::CommandBase, run::{builder::RunBuilder, Run}, }; @@ -115,7 +115,7 @@ pub async fn run( telemetry: CommandEventBuilder, output: Option, ) -> Result<(), cli::Error> { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/commands/query.rs b/crates/turborepo-lib/src/commands/query.rs index a3129cfc2b9c6..0a514db0916fd 100644 --- a/crates/turborepo-lib/src/commands/query.rs +++ b/crates/turborepo-lib/src/commands/query.rs @@ -5,11 +5,11 @@ use camino::Utf8Path; use miette::{Diagnostic, Report, SourceSpan}; use thiserror::Error; use turbopath::AbsoluteSystemPathBuf; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{ - commands::{run::get_signal, CommandBase}, + commands::CommandBase, query, query::{Error, RepositoryQuery}, run::builder::RunBuilder, diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index 9c034f6d05558..174d9d93f0b3f 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -1,40 +1,14 @@ -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use tracing::error; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; use crate::{commands::CommandBase, run, run::builder::RunBuilder}; -#[cfg(windows)] -pub fn get_signal() -> Result>, run::Error> { - let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; - Ok(async move { ctrl_c.recv().await }) -} - -#[cfg(not(windows))] -pub fn get_signal() -> Result>, run::Error> { - use tokio::signal::unix; - let mut sigint = - unix::signal(unix::SignalKind::interrupt()).map_err(run::Error::SignalHandler)?; - let mut sigterm = - unix::signal(unix::SignalKind::terminate()).map_err(run::Error::SignalHandler)?; - - Ok(async move { - tokio::select! { - res = sigint.recv() => { - res - } - res = sigterm.recv() => { - res - } - } - }) -} - pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 70f97375e3b0d..76ca1e4e3d4a6 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -31,7 +31,7 @@ use turborepo_ci::Vendor; use turborepo_env::EnvironmentVariableMap; use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode}; use turborepo_scm::SCM; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::generic::GenericEventBuilder; use turborepo_ui::{ cprint, cprintln, sender::UISender, tui, tui::TuiSender, wui::sender::WebUISender, ColorConfig, @@ -336,7 +336,7 @@ impl Run { }; let interrupt = async { - if let Ok(fut) = crate::commands::run::get_signal() { + if let Ok(fut) = get_signal() { fut.await; } else { tracing::warn!("could not register ctrl-c handler"); diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index ad99524db431a..172b638af8753 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -10,12 +10,12 @@ use thiserror::Error; use tokio::{select, sync::Notify, task::JoinHandle}; use tracing::{instrument, trace, warn}; use turborepo_repository::package_graph::PackageName; -use turborepo_signals::SignalHandler; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; use crate::{ - commands::{self, CommandBase}, + commands::CommandBase, daemon::{proto, DaemonConnectorError, DaemonError}, get_version, opts, run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run}, @@ -115,7 +115,7 @@ impl WatchClient { experimental_write_cache: bool, telemetry: CommandEventBuilder, ) -> Result { - let signal = commands::run::get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); if base.opts.repo_opts.root_turbo_json_path != base.repo_root.join_component(CONFIG_FILE) { diff --git a/crates/turborepo-signals/src/lib.rs b/crates/turborepo-signals/src/lib.rs index 210f55e0b39f7..a38878618a424 100644 --- a/crates/turborepo-signals/src/lib.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -3,6 +3,8 @@ //! A crate for registering listeners for a given signal +pub mod listeners; + use std::{ fmt::Debug, future::Future, diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs new file mode 100644 index 0000000000000..aad544574239c --- /dev/null +++ b/crates/turborepo-signals/src/listeners.rs @@ -0,0 +1,29 @@ +use std::future::Future; + +#[cfg(windows)] +/// A listener for Windows Console Ctrl-C events +pub fn get_signal() -> Result>, std::io::Error> { + let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; + Ok(async move { ctrl_c.recv().await }) +} + +#[cfg(not(windows))] +/// A listener for commong Unix signals that require special handling +/// +/// Currently listens for SIGINT and SIGTERM +pub fn get_signal() -> Result>, std::io::Error> { + use tokio::signal::unix; + let mut sigint = unix::signal(unix::SignalKind::interrupt())?; + let mut sigterm = unix::signal(unix::SignalKind::terminate())?; + + Ok(async move { + tokio::select! { + res = sigint.recv() => { + res + } + res = sigterm.recv() => { + res + } + } + }) +} From eaf429ba5a6ab08857c7a9a6ece7037a0856a93a Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 17 Feb 2025 14:07:27 -0500 Subject: [PATCH 4/9] feat(signals): give subscribers signal kind --- crates/turborepo-signals/src/lib.rs | 20 +++++++++++--------- crates/turborepo-signals/src/listeners.rs | 12 +++++++----- crates/turborepo-signals/src/signals.rs | 10 ++++++++++ 3 files changed, 28 insertions(+), 14 deletions(-) create mode 100644 crates/turborepo-signals/src/signals.rs diff --git a/crates/turborepo-signals/src/lib.rs b/crates/turborepo-signals/src/lib.rs index a38878618a424..3ec9a1e1a2bf8 100644 --- a/crates/turborepo-signals/src/lib.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -4,6 +4,7 @@ //! A crate for registering listeners for a given signal pub mod listeners; +pub mod signals; use std::{ fmt::Debug, @@ -12,6 +13,7 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; +use signals::Signal; use tokio::sync::{mpsc, oneshot}; /// SignalHandler provides a mechanism to subscribe to a future and get alerted @@ -24,22 +26,22 @@ pub struct SignalHandler { #[derive(Debug, Default)] struct HandlerState { - subscribers: Vec>>, + subscribers: Vec>>, is_closing: bool, } -pub struct SignalSubscriber(oneshot::Receiver>); +pub struct SignalSubscriber(oneshot::Receiver>); /// SubscriberGuard should be kept until a subscriber is done processing the /// signal pub struct SubscriberGuard { - _guard: oneshot::Sender<()>, + _guard: oneshot::Sender, } impl SignalHandler { /// Construct a new SignalHandler that will alert any subscribers when /// `signal_source` completes or `close` is called on it. - pub fn new(signal_source: impl Future> + Send + 'static) -> Self { + pub fn new(signal_source: impl Future> + Send + 'static) -> Self { // think about channel size let state = Arc::new(Mutex::new(HandlerState::default())); let worker_state = state.clone(); @@ -126,7 +128,7 @@ impl SignalSubscriber { } impl HandlerState { - fn add_subscriber(&mut self) -> Option>> { + fn add_subscriber(&mut self) -> Option>> { (!self.is_closing).then(|| { let (tx, rx) = oneshot::channel(); self.subscribers.push(tx); @@ -147,7 +149,7 @@ mod test { let handler = SignalHandler::new(async move { rx.await.ok() }); let subscriber = handler.subscribe().unwrap(); // Send mocked SIGINT - tx.send(()).unwrap(); + tx.send(Signal::Interrupt).unwrap(); let (done, mut is_done) = oneshot::channel(); let handler2 = handler.clone(); @@ -170,7 +172,7 @@ mod test { #[tokio::test] async fn test_subscribers_triggered_from_close() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(async move { rx.await.ok().map(|_| Signal::Interrupt) }); let subscriber = handler.subscribe().unwrap(); let (close_done, mut is_close_done) = oneshot::channel(); @@ -193,7 +195,7 @@ mod test { #[tokio::test] async fn test_close_idempotent() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(async move { rx.await.ok().map(|_| Signal::Interrupt) }); handler.close().await; handler.close().await; } @@ -205,7 +207,7 @@ mod test { let subscriber = handler.subscribe().unwrap(); // Send SIGINT - tx.send(()).unwrap(); + tx.send(Signal::Interrupt).unwrap(); // Do a quick yield to give the worker a chance to read the sigint tokio::task::yield_now().await; assert!( diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs index aad544574239c..4fdc456d22b1b 100644 --- a/crates/turborepo-signals/src/listeners.rs +++ b/crates/turborepo-signals/src/listeners.rs @@ -1,17 +1,19 @@ use std::future::Future; +use crate::signals::Signal; + #[cfg(windows)] /// A listener for Windows Console Ctrl-C events -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, std::io::Error> { let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; - Ok(async move { ctrl_c.recv().await }) + Ok(async move { ctrl_c.recv().await.map(|_| Signal::CtrlC) }) } #[cfg(not(windows))] /// A listener for commong Unix signals that require special handling /// /// Currently listens for SIGINT and SIGTERM -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, std::io::Error> { use tokio::signal::unix; let mut sigint = unix::signal(unix::SignalKind::interrupt())?; let mut sigterm = unix::signal(unix::SignalKind::terminate())?; @@ -19,10 +21,10 @@ pub fn get_signal() -> Result>, std::io::Error> Ok(async move { tokio::select! { res = sigint.recv() => { - res + res.map(|_| Signal::Interrupt) } res = sigterm.recv() => { - res + res.map(|_| Signal::Terminate) } } }) diff --git a/crates/turborepo-signals/src/signals.rs b/crates/turborepo-signals/src/signals.rs new file mode 100644 index 0000000000000..ae6a0d71f71a1 --- /dev/null +++ b/crates/turborepo-signals/src/signals.rs @@ -0,0 +1,10 @@ +/// A collection of signals that are caught by the listeners +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Signal { + #[cfg(windows)] + CtrlC, + #[cfg(not(windows))] + Interrupt, + #[cfg(not(windows))] + Terminate, +} From 679b8d9f7cc973fc88bcc00ce791ddb68ba0083e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 17 Feb 2025 14:48:54 -0500 Subject: [PATCH 5/9] feat(signals): support receiving multiple signals --- crates/turborepo-lib/src/run/mod.rs | 6 ++-- crates/turborepo-signals/src/lib.rs | 40 ++++++++++++++++++----- crates/turborepo-signals/src/listeners.rs | 14 ++++---- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 76ca1e4e3d4a6..67169de52fcdf 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -22,8 +22,9 @@ use std::{ pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache}; use chrono::{DateTime, Local}; +use futures::StreamExt; use rayon::iter::ParallelBridge; -use tokio::{select, task::JoinHandle}; +use tokio::{pin, select, task::JoinHandle}; use tracing::{debug, instrument}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_api_client::{APIAuth, APIClient}; @@ -337,7 +338,8 @@ impl Run { let interrupt = async { if let Ok(fut) = get_signal() { - fut.await; + pin!(fut); + fut.next().await; } else { tracing::warn!("could not register ctrl-c handler"); // wait forever diff --git a/crates/turborepo-signals/src/lib.rs b/crates/turborepo-signals/src/lib.rs index 3ec9a1e1a2bf8..1743c6e5060b1 100644 --- a/crates/turborepo-signals/src/lib.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -8,13 +8,15 @@ pub mod signals; use std::{ fmt::Debug, - future::Future, sync::{Arc, Mutex}, }; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{stream::FuturesUnordered, Stream, StreamExt}; use signals::Signal; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + pin, + sync::{mpsc, oneshot}, +}; /// SignalHandler provides a mechanism to subscribe to a future and get alerted /// whenever the future completes or the handler gets a close message. @@ -41,16 +43,17 @@ pub struct SubscriberGuard { impl SignalHandler { /// Construct a new SignalHandler that will alert any subscribers when /// `signal_source` completes or `close` is called on it. - pub fn new(signal_source: impl Future> + Send + 'static) -> Self { + pub fn new(signal_source: impl Stream> + Send + 'static) -> Self { // think about channel size let state = Arc::new(Mutex::new(HandlerState::default())); let worker_state = state.clone(); let (close, mut rx) = mpsc::channel::<()>(1); tokio::spawn(async move { + pin!(signal_source); tokio::select! { // We don't care if we get a signal or if we are unable to receive signals // Either way we start the shutdown. - _ = signal_source => {}, + _ = signal_source.next() => {}, // We don't care if a close message was sent or if all handlers are dropped. // Either way start the shutdown process. _ = rx.recv() => {} @@ -141,12 +144,22 @@ impl HandlerState { mod test { use std::{assert_matches::assert_matches, time::Duration}; + use futures::stream; + use super::*; + #[cfg(windows)] + const DEFAULT_SIGNAL: Signal = Signal::CtrlC; + #[cfg(not(windows))] + const DEFAULT_SIGNAL: Signal = Signal::Interrupt; + #[tokio::test] async fn test_subscribers_triggered_from_signal() { let (tx, rx) = oneshot::channel(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); // Send mocked SIGINT tx.send(Signal::Interrupt).unwrap(); @@ -172,7 +185,10 @@ mod test { #[tokio::test] async fn test_subscribers_triggered_from_close() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok().map(|_| Signal::Interrupt) }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); let (close_done, mut is_close_done) = oneshot::channel(); @@ -195,7 +211,10 @@ mod test { #[tokio::test] async fn test_close_idempotent() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok().map(|_| Signal::Interrupt) }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); handler.close().await; handler.close().await; } @@ -203,7 +222,10 @@ mod test { #[tokio::test] async fn test_subscribe_after_close() { let (tx, rx) = oneshot::channel(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); // Send SIGINT diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs index 4fdc456d22b1b..8bbba3dc3c6b1 100644 --- a/crates/turborepo-signals/src/listeners.rs +++ b/crates/turborepo-signals/src/listeners.rs @@ -1,24 +1,26 @@ -use std::future::Future; +use futures::{stream, Stream}; use crate::signals::Signal; #[cfg(windows)] /// A listener for Windows Console Ctrl-C events -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, std::io::Error> { let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; - Ok(async move { ctrl_c.recv().await.map(|_| Signal::CtrlC) }) + Ok(stream::once(async move { + ctrl_c.recv().await.map(|_| Signal::CtrlC) + })) } #[cfg(not(windows))] /// A listener for commong Unix signals that require special handling /// /// Currently listens for SIGINT and SIGTERM -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, std::io::Error> { use tokio::signal::unix; let mut sigint = unix::signal(unix::SignalKind::interrupt())?; let mut sigterm = unix::signal(unix::SignalKind::terminate())?; - Ok(async move { + Ok(stream::once(async move { tokio::select! { res = sigint.recv() => { res.map(|_| Signal::Interrupt) @@ -27,5 +29,5 @@ pub fn get_signal() -> Result>, std::io::Err res.map(|_| Signal::Terminate) } } - }) + })) } From d9a2b63156dcee27a322c94893d6abe7c213d6b9 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 21 Feb 2025 08:58:26 -0500 Subject: [PATCH 6/9] fix(signals): fix windows compilation --- crates/turborepo-signals/src/listeners.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs index 8bbba3dc3c6b1..66e208ca7f402 100644 --- a/crates/turborepo-signals/src/listeners.rs +++ b/crates/turborepo-signals/src/listeners.rs @@ -5,7 +5,7 @@ use crate::signals::Signal; #[cfg(windows)] /// A listener for Windows Console Ctrl-C events pub fn get_signal() -> Result>, std::io::Error> { - let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; + let mut ctrl_c = tokio::signal::windows::ctrl_c()?; Ok(stream::once(async move { ctrl_c.recv().await.map(|_| Signal::CtrlC) })) From 4ccb2cb1fb792601de3703fb0f5144ea985df6a0 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 21 Feb 2025 10:12:49 -0500 Subject: [PATCH 7/9] add license --- crates/turborepo-signals/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/turborepo-signals/Cargo.toml b/crates/turborepo-signals/Cargo.toml index 6b35bace856ff..ad04f1bb3ad12 100644 --- a/crates/turborepo-signals/Cargo.toml +++ b/crates/turborepo-signals/Cargo.toml @@ -2,6 +2,7 @@ name = "turborepo-signals" version = "0.1.0" edition = "2021" +license = "MIT" [dependencies] futures = "0.3.30" From d168658d9fed64c022552308af94f3d2e6819ea0 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 21 Feb 2025 10:14:31 -0500 Subject: [PATCH 8/9] fix windows tests --- crates/turborepo-signals/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/turborepo-signals/src/lib.rs b/crates/turborepo-signals/src/lib.rs index 1743c6e5060b1..d70eb3e2b6ea1 100644 --- a/crates/turborepo-signals/src/lib.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -162,7 +162,7 @@ mod test { })); let subscriber = handler.subscribe().unwrap(); // Send mocked SIGINT - tx.send(Signal::Interrupt).unwrap(); + tx.send(DEFAULT_SIGNAL).unwrap(); let (done, mut is_done) = oneshot::channel(); let handler2 = handler.clone(); @@ -229,7 +229,7 @@ mod test { let subscriber = handler.subscribe().unwrap(); // Send SIGINT - tx.send(Signal::Interrupt).unwrap(); + tx.send(DEFAULT_SIGNAL).unwrap(); // Do a quick yield to give the worker a chance to read the sigint tokio::task::yield_now().await; assert!( From 3f887071c34391617a735e2e278aa256166dab42 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 27 Feb 2025 11:05:27 -0500 Subject: [PATCH 9/9] chore(signals): custom error type for failing to setup signal --- Cargo.lock | 1 + crates/turborepo-lib/src/cli/error.rs | 2 +- crates/turborepo-lib/src/commands/boundaries.rs | 2 +- crates/turborepo-lib/src/commands/ls.rs | 2 +- crates/turborepo-lib/src/commands/query.rs | 2 +- crates/turborepo-lib/src/commands/run.rs | 2 +- crates/turborepo-lib/src/run/error.rs | 4 ++-- crates/turborepo-lib/src/run/watch.rs | 2 +- crates/turborepo-signals/Cargo.toml | 1 + crates/turborepo-signals/src/listeners.rs | 8 ++++++-- 10 files changed, 16 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c23746a6d49b..c82d333991374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6771,6 +6771,7 @@ name = "turborepo-signals" version = "0.1.0" dependencies = [ "futures", + "thiserror", "tokio", ] diff --git a/crates/turborepo-lib/src/cli/error.rs b/crates/turborepo-lib/src/cli/error.rs index 2407671e22d6a..4107a8dd414b4 100644 --- a/crates/turborepo-lib/src/cli/error.rs +++ b/crates/turborepo-lib/src/cli/error.rs @@ -78,7 +78,7 @@ pub async fn print_potential_tasks( base: CommandBase, telemetry: CommandEventBuilder, ) -> Result<(), Error> { - let signal = get_signal().map_err(run::Error::SignalHandler)?; + let signal = get_signal().map_err(run::Error::from)?; let handler = SignalHandler::new(signal); let color_config = base.color_config; diff --git a/crates/turborepo-lib/src/commands/boundaries.rs b/crates/turborepo-lib/src/commands/boundaries.rs index 03211975078be..55a5d93eb95d5 100644 --- a/crates/turborepo-lib/src/commands/boundaries.rs +++ b/crates/turborepo-lib/src/commands/boundaries.rs @@ -4,7 +4,7 @@ use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{cli, commands::CommandBase, run::builder::RunBuilder}; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; + let signal = get_signal().map_err(crate::run::Error::from)?; let handler = SignalHandler::new(signal); let run = RunBuilder::new(base)? diff --git a/crates/turborepo-lib/src/commands/ls.rs b/crates/turborepo-lib/src/commands/ls.rs index a1c94f6bd874d..7a9500e9a31ee 100644 --- a/crates/turborepo-lib/src/commands/ls.rs +++ b/crates/turborepo-lib/src/commands/ls.rs @@ -115,7 +115,7 @@ pub async fn run( telemetry: CommandEventBuilder, output: Option, ) -> Result<(), cli::Error> { - let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; + let signal = get_signal().map_err(crate::run::Error::from)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/commands/query.rs b/crates/turborepo-lib/src/commands/query.rs index 0a514db0916fd..034ae1e7802d6 100644 --- a/crates/turborepo-lib/src/commands/query.rs +++ b/crates/turborepo-lib/src/commands/query.rs @@ -161,7 +161,7 @@ pub async fn run( variables_path: Option<&Utf8Path>, include_schema: bool, ) -> Result { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::from)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)? diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index 174d9d93f0b3f..39b68eb9c39e4 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -8,7 +8,7 @@ use turborepo_ui::sender::UISender; use crate::{commands::CommandBase, run, run::builder::RunBuilder}; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; + let signal = get_signal()?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/run/error.rs b/crates/turborepo-lib/src/run/error.rs index 0c3ee83aaa229..952fbfd3e0e53 100644 --- a/crates/turborepo-lib/src/run/error.rs +++ b/crates/turborepo-lib/src/run/error.rs @@ -52,8 +52,8 @@ pub enum Error { #[error(transparent)] #[diagnostic(transparent)] Visitor(#[from] task_graph::VisitorError), - #[error("Failed to register signal handler: {0}")] - SignalHandler(std::io::Error), + #[error(transparent)] + SignalHandler(#[from] turborepo_signals::listeners::Error), #[error(transparent)] Daemon(#[from] daemon::DaemonError), #[error(transparent)] diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 172b638af8753..5df4dab037b0a 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -115,7 +115,7 @@ impl WatchClient { experimental_write_cache: bool, telemetry: CommandEventBuilder, ) -> Result { - let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; + let signal = get_signal().map_err(crate::run::Error::from)?; let handler = SignalHandler::new(signal); if base.opts.repo_opts.root_turbo_json_path != base.repo_root.join_component(CONFIG_FILE) { diff --git a/crates/turborepo-signals/Cargo.toml b/crates/turborepo-signals/Cargo.toml index ad04f1bb3ad12..dc83cbd24cd81 100644 --- a/crates/turborepo-signals/Cargo.toml +++ b/crates/turborepo-signals/Cargo.toml @@ -6,6 +6,7 @@ license = "MIT" [dependencies] futures = "0.3.30" +thiserror = { workspace = true } tokio = { workspace = true, features = ["full", "time"] } [dev-dependencies] diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs index 66e208ca7f402..eba42c0558145 100644 --- a/crates/turborepo-signals/src/listeners.rs +++ b/crates/turborepo-signals/src/listeners.rs @@ -2,9 +2,13 @@ use futures::{stream, Stream}; use crate::signals::Signal; +#[derive(Debug, thiserror::Error)] +#[error("Failed to register signal handler: {0}")] +pub struct Error(#[from] std::io::Error); + #[cfg(windows)] /// A listener for Windows Console Ctrl-C events -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, Error> { let mut ctrl_c = tokio::signal::windows::ctrl_c()?; Ok(stream::once(async move { ctrl_c.recv().await.map(|_| Signal::CtrlC) @@ -15,7 +19,7 @@ pub fn get_signal() -> Result>, std::io::Error /// A listener for commong Unix signals that require special handling /// /// Currently listens for SIGINT and SIGTERM -pub fn get_signal() -> Result>, std::io::Error> { +pub fn get_signal() -> Result>, Error> { use tokio::signal::unix; let mut sigint = unix::signal(unix::SignalKind::interrupt())?; let mut sigterm = unix::signal(unix::SignalKind::terminate())?;