这是indexloc提供的服务,不要输入任何密码
Skip to content

feat: allow Watch Mode with --root-turbo-json #10687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions crates/turborepo-lib/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,9 @@ pub enum Command {
/// Set the idle timeout for turbod
#[clap(long, default_value_t = String::from("4h0m0s"))]
idle_time: String,
/// Path to a custom turbo.json file to watch from --root-turbo-json
#[clap(long)]
turbo_json_path: Option<Utf8PathBuf>,
#[clap(subcommand)]
command: Option<DaemonCommand>,
},
Expand Down Expand Up @@ -1428,15 +1431,23 @@ pub async fn run(
Ok(clone::run(cwd, url, dir.as_deref(), *ci, *local, *depth)?)
}
#[allow(unused_variables)]
Command::Daemon { command, idle_time } => {
Command::Daemon {
command,
idle_time,
turbo_json_path,
} => {
let event = CommandEventBuilder::new("daemon").with_parent(&root_telemetry);
event.track_call();
let base = CommandBase::new(cli_args.clone(), repo_root, version, color_config)?;
event.track_ui_mode(base.opts.run_opts.ui_mode);

match command {
Some(command) => daemon::daemon_client(command, &base).await,
None => daemon::daemon_server(&base, idle_time, logger).await,
Some(command) => {
daemon::daemon_client(command, &base, turbo_json_path.clone()).await
}
None => {
daemon::daemon_server(&base, idle_time, turbo_json_path.clone(), logger).await
}
}?;

Ok(0)
Expand Down
45 changes: 41 additions & 4 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,46 @@ use crate::{
const DAEMON_NOT_RUNNING_MESSAGE: &str =
"daemon is not running, run `turbo daemon start` to start it";

/// Converts an optional turbo.json path to an absolute system path.
fn convert_turbo_json_path(
path: Option<&camino::Utf8Path>,
) -> Result<Option<turbopath::AbsoluteSystemPathBuf>, DaemonError> {
match path {
Some(p) => match turbopath::AbsoluteSystemPathBuf::from_cwd(p) {
Ok(path) => Ok(Some(path)),
Err(e) => {
tracing::error!("Failed to convert custom turbo.json path: {}", e);
Err(DaemonError::Unavailable(format!(
"Invalid turbo.json path: {}",
e
)))
}
},
None => Ok(None),
}
}

/// Runs the daemon command.
pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Result<(), DaemonError> {
pub async fn daemon_client(
command: &DaemonCommand,
base: &CommandBase,
custom_turbo_json_path: Option<camino::Utf8PathBuf>,
) -> Result<(), DaemonError> {
let (can_start_server, can_kill_server) = match command {
DaemonCommand::Status { .. } | DaemonCommand::Logs => (false, false),
DaemonCommand::Stop => (false, true),
DaemonCommand::Restart | DaemonCommand::Start => (true, true),
DaemonCommand::Clean { .. } => (false, true),
};

let connector = DaemonConnector::new(can_start_server, can_kill_server, &base.repo_root);
let custom_turbo_json_path = convert_turbo_json_path(custom_turbo_json_path.as_deref())?;

let connector = DaemonConnector::new(
can_start_server,
can_kill_server,
&base.repo_root,
custom_turbo_json_path,
);

match command {
DaemonCommand::Restart => {
Expand Down Expand Up @@ -275,6 +305,7 @@ fn log_filename(base_filename: &str) -> Result<String, time::Error> {
pub async fn daemon_server(
base: &CommandBase,
idle_time: &String,
turbo_json_path: Option<camino::Utf8PathBuf>,
logging: &TurboSubscriber,
) -> Result<(), DaemonError> {
let paths = Paths::from_repo_root(&base.repo_root);
Expand All @@ -298,8 +329,14 @@ pub async fn daemon_server(
}
CloseReason::Interrupt
});
let server =
crate::daemon::TurboGrpcService::new(base.repo_root.clone(), paths, timeout, exit_signal);
let custom_turbo_json_path = convert_turbo_json_path(turbo_json_path.as_deref())?;
let server = crate::daemon::TurboGrpcService::new(
base.repo_root.clone(),
paths,
timeout,
exit_signal,
custom_turbo_json_path,
);

let reason = server.serve().await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/commands/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn is_wsl() -> bool {

pub async fn run(base: CommandBase) {
let system = System::new_all();
let connector = DaemonConnector::new(false, false, &base.repo_root);
let connector = DaemonConnector::new(false, false, &base.repo_root, None);
let daemon_status = match connector.connect().await {
Ok(_status) => "Running",
Err(DaemonConnectorError::NotRunning) => "Not running",
Expand Down
36 changes: 24 additions & 12 deletions crates/turborepo-lib/src/daemon/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ pub struct DaemonConnector {
/// in the event of a version mismatch).
pub can_kill_server: bool,
pub paths: Paths,
/// Optional custom turbo.json path to watch
pub custom_turbo_json_path: Option<turbopath::AbsoluteSystemPathBuf>,
}

impl DaemonConnector {
pub fn new(
can_start_server: bool,
can_kill_server: bool,
repo_root: &AbsoluteSystemPath,
custom_turbo_json_path: Option<turbopath::AbsoluteSystemPathBuf>,
) -> Self {
let paths = Paths::from_repo_root(repo_root);
Self {
can_start_server,
can_kill_server,
paths,
custom_turbo_json_path,
}
}

Expand Down Expand Up @@ -154,21 +158,29 @@ impl DaemonConnector {
}
None if self.can_start_server => {
debug!("no pid found, starting daemon");
Self::start_daemon().await
Self::start_daemon(&self.custom_turbo_json_path).await
}
None => Err(DaemonConnectorError::NotRunning),
}
}

/// Starts the daemon process, returning its PID.
async fn start_daemon() -> Result<sysinfo::Pid, DaemonConnectorError> {
async fn start_daemon(
custom_turbo_json_path: &Option<turbopath::AbsoluteSystemPathBuf>,
) -> Result<sysinfo::Pid, DaemonConnectorError> {
let binary_path =
std::env::current_exe().map_err(|e| DaemonConnectorError::Fork(e.into()))?;
// this creates a new process group for the given command
// in a cross platform way, directing all output to /dev/null
let mut group = tokio::process::Command::new(binary_path)
.arg("--skip-infer")
.arg("daemon")
let mut command = tokio::process::Command::new(binary_path);
command.arg("--skip-infer").arg("daemon");

// Pass custom turbo.json path if specified
if let Some(path) = custom_turbo_json_path {
command.arg("--turbo-json-path").arg(path.as_str());
}

let mut group = command
.stderr(Stdio::null())
.stdout(Stdio::null())
.group()
Expand Down Expand Up @@ -437,7 +449,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();

let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);
connector.paths.pid_file.ensure_dir().unwrap();
connector
.paths
Expand All @@ -455,7 +467,7 @@ mod test {
async fn handles_missing_server_connect() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);

assert_matches!(
connector.connect().await,
Expand All @@ -467,7 +479,7 @@ mod test {
async fn handles_kill_dead_server_missing_pid() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);

assert_matches!(
connector.kill_dead_server(Pid::from(usize::MAX)).await,
Expand All @@ -479,7 +491,7 @@ mod test {
async fn handles_kill_dead_server_missing_process() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);

connector.paths.pid_file.ensure_dir().unwrap();
connector
Expand All @@ -505,7 +517,7 @@ mod test {
async fn handles_kill_dead_server_wrong_process() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);

let proc = tokio::process::Command::new(NODE_EXE)
.stdout(Stdio::null())
Expand Down Expand Up @@ -542,7 +554,7 @@ mod test {
async fn handles_kill_dead_server() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, true, &repo_root);
let connector = DaemonConnector::new(false, true, &repo_root, None);

let proc = tokio::process::Command::new(NODE_EXE)
.stdout(Stdio::null())
Expand Down Expand Up @@ -682,7 +694,7 @@ mod test {

let tmp_dir = tempfile::tempdir().unwrap();
let repo_root = AbsoluteSystemPathBuf::try_from(tmp_dir.path()).unwrap();
let connector = DaemonConnector::new(false, false, &repo_root);
let connector = DaemonConnector::new(false, false, &repo_root, None);

let mut client = Endpoint::try_from("http://[::]:50051")
.expect("this is a valid uri")
Expand Down
26 changes: 22 additions & 4 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct FileWatching {
pub package_watcher: Arc<PackageWatcher>,
pub package_changes_watcher: OnceLock<Arc<PackageChangesWatcher>>,
pub hash_watcher: Arc<HashWatcher>,
custom_turbo_json_path: Option<AbsoluteSystemPathBuf>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -111,7 +112,10 @@ impl FileWatching {
/// waiting for the filewatcher to be ready. Using `OptionalWatch`,
/// dependent services can wait for resources they need to become
/// available, and the server can start up without waiting for them.
pub fn new(repo_root: AbsoluteSystemPathBuf) -> Result<FileWatching, WatchError> {
pub fn new(
repo_root: AbsoluteSystemPathBuf,
custom_turbo_json_path: Option<AbsoluteSystemPathBuf>,
) -> Result<FileWatching, WatchError> {
let watcher = Arc::new(FileSystemWatcher::new_with_default_cookie_dir(&repo_root)?);
let recv = watcher.watch();

Expand Down Expand Up @@ -144,6 +148,7 @@ impl FileWatching {
package_watcher,
package_changes_watcher: OnceLock::new(),
hash_watcher,
custom_turbo_json_path,
})
}

Expand All @@ -155,6 +160,7 @@ impl FileWatching {
self.repo_root.clone(),
recv,
self.hash_watcher.clone(),
self.custom_turbo_json_path.clone(),
))
})
.clone()
Expand All @@ -169,6 +175,7 @@ pub struct TurboGrpcService<S> {
paths: Paths,
timeout: Duration,
external_shutdown: S,
custom_turbo_json_path: Option<AbsoluteSystemPathBuf>,
}

impl<S> TurboGrpcService<S>
Expand All @@ -186,6 +193,7 @@ where
paths: Paths,
timeout: Duration,
external_shutdown: S,
custom_turbo_json_path: Option<AbsoluteSystemPathBuf>,
) -> Self {
// Run the actual service. It takes ownership of the struct given to it,
// so we use a private struct with just the pieces of state needed to handle
Expand All @@ -195,6 +203,7 @@ where
paths,
timeout,
external_shutdown,
custom_turbo_json_path,
}
}

Expand All @@ -204,15 +213,20 @@ where
paths,
repo_root,
timeout,
custom_turbo_json_path,
} = self;

// A channel to trigger the shutdown of the gRPC server. This is handed out
// to components internal to the server process such as root watching, as
// well as available to the gRPC server itself to handle the shutdown RPC.
let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1);

let (service, exit_root_watch, watch_root_handle) =
TurboGrpcServiceInner::new(repo_root.clone(), trigger_shutdown, paths.log_file);
let (service, exit_root_watch, watch_root_handle) = TurboGrpcServiceInner::new(
repo_root.clone(),
trigger_shutdown,
paths.log_file,
custom_turbo_json_path,
);

let running = Arc::new(AtomicBool::new(true));
let (_pid_lock, stream) =
Expand Down Expand Up @@ -290,12 +304,13 @@ impl TurboGrpcServiceInner {
repo_root: AbsoluteSystemPathBuf,
trigger_shutdown: mpsc::Sender<()>,
log_file: AbsoluteSystemPathBuf,
custom_turbo_json_path: Option<AbsoluteSystemPathBuf>,
) -> (
Self,
oneshot::Sender<()>,
JoinHandle<Result<(), WatchError>>,
) {
let file_watching = FileWatching::new(repo_root.clone()).unwrap();
let file_watching = FileWatching::new(repo_root.clone(), custom_turbo_json_path).unwrap();

tracing::debug!("initing package discovery");
// Note that we're cloning the Arc, not the package watcher itself
Expand Down Expand Up @@ -778,6 +793,7 @@ mod test {
paths.clone(),
Duration::from_secs(60 * 60),
exit_signal,
None,
);

// the package watcher reads data from the package.json file
Expand Down Expand Up @@ -835,6 +851,7 @@ mod test {
paths.clone(),
Duration::from_millis(10),
exit_signal,
None,
);

// the package watcher reads data from the package.json file
Expand Down Expand Up @@ -892,6 +909,7 @@ mod test {
paths,
Duration::from_secs(60 * 60),
exit_signal,
None,
);

let handle = tokio::task::spawn(server.serve());
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ impl Diagnostic for DaemonDiagnostic {
can_kill_server: false,
can_start_server: true,
paths,
custom_turbo_json_path: None,
};

let mut client = match connector.connect().await {
Expand Down
Loading
Loading