这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,8 @@ impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}

pub async fn stop_tasks_matching(&self, predicate: impl Fn(&turborepo_process::Child) -> bool) {
self.manager.stop_children_matching(predicate).await;
}
}
67 changes: 58 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
commands::CommandBase,
config::resolve_turbo_config_path,
daemon::{proto, DaemonConnectorError, DaemonError},
engine::TaskNode,
get_version, opts,
run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run},
DaemonConnector, DaemonPaths,
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct WatchClient {
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<RunHandle>,
active_runs: Vec<RunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand Down Expand Up @@ -164,6 +166,7 @@ impl WatchClient {
telemetry,
experimental_write_cache,
persistent_tasks_handle: None,
active_runs: Vec::new(),
ui_sender,
ui_handle,
})
Expand Down Expand Up @@ -195,7 +198,6 @@ impl WatchClient {
};

let run_fut = async {
let mut run_handle: Option<RunHandle> = None;
loop {
notify_run.notified().await;
let some_changed_packages = {
Expand All @@ -205,16 +207,23 @@ impl WatchClient {
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};

if let Some(changed_packages) = some_changed_packages {
if let Some(mut changed_packages) = some_changed_packages {
// Clean up currently running tasks
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to
// finish to ensure all messages are flushed.
let _ = run_task.await;
self.active_runs.retain(|h| !h.run_task.is_finished());

match &mut changed_packages {
ChangedPackages::Some(pkgs) => {
self.stop_impacted_tasks(pkgs).await;
}
ChangedPackages::All => {
for handle in self.active_runs.drain(..) {
handle.stopper.stop().await;
let _ = handle.run_task.await;
}
}
}
run_handle = Some(self.execute_run(changed_packages).await?);
let new_run = self.execute_run(changed_packages).await?;
self.active_runs.push(new_run);
}
}
};
Expand Down Expand Up @@ -266,11 +275,51 @@ impl WatchClient {
Ok(())
}

async fn stop_impacted_tasks(&self, pkgs: &mut HashSet<PackageName>) {
let engine = self.run.engine();
let mut tasks_to_stop = HashSet::new();

for node in engine.tasks() {
if let TaskNode::Task(task_id) = node {
if pkgs.contains(&PackageName::from(task_id.package())) {
tasks_to_stop.insert(task_id.clone());

for dependent_node in engine.transitive_dependents(task_id) {
if let TaskNode::Task(dependent_id) = dependent_node {
tasks_to_stop.insert(dependent_id.clone());
}
}
}
}
}

let mut impacted_packages = HashSet::new();
for task_id in &tasks_to_stop {
impacted_packages.insert(PackageName::from(task_id.package()));
}

*pkgs = impacted_packages;

for handle in &self.active_runs {
let tasks = tasks_to_stop.clone();
handle
.stopper
.stop_tasks_matching(move |child| {
child.task_id().map_or(false, |id| tasks.contains(id))
})
.await;
}
}

/// Shut down any resources that run as part of watch.
pub async fn shutdown(&mut self) {
if let Some(sender) = &self.ui_sender {
sender.stop().await;
}
for handle in self.active_runs.drain(..) {
handle.stopper.stop().await;
let _ = handle.run_task.await;
}
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/task_graph/visitor/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl<'a> CommandProvider for PackageGraphCommandProvider<'a> {
// We clear the env before populating it with variables we expect
cmd.env_clear();
cmd.envs(environment.iter());
cmd.task_id(task_id.clone().into_owned());

// If the task has an associated proxy, then we indicate this to the underlying
// task via an env var
Expand Down Expand Up @@ -277,6 +278,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
let program = which::which(package_manager.command())?;
let mut cmd = Command::new(&program);
cmd.current_dir(package_dir).args(args).open_stdin();
cmd.task_id(task_id.clone().into_owned());
Some(cmd)
} else if has_mfe_dependency {
tracing::debug!(
Expand All @@ -297,6 +299,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
let program = package_dir.join_components(&["node_modules", ".bin", bin_name]);
let mut cmd = Command::new(program.as_std_path());
cmd.current_dir(package_dir).args(args).open_stdin();
cmd.task_id(task_id.clone().into_owned());
Some(cmd)
} else {
tracing::debug!("MicroFrontendProxyProvider::command - using Turborepo built-in proxy");
Expand Down
20 changes: 19 additions & 1 deletion crates/turborepo-lib/src/task_graph/visitor/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ enum ExecOutcome {
},
// Task didn't execute normally due to a shutdown being initiated by another task
Shutdown,
// Task was stopped to be restarted
Restarted,
}

enum SuccessOutcome {
Expand Down Expand Up @@ -259,6 +261,12 @@ impl ExecContext {
// stopped if we think we're shutting down.
self.manager.stop().await;
}
Ok(ExecOutcome::Restarted) => {
tracker.cancel();
// We need to stop dependent tasks because this task will be restarted
// in a new run.
callback.send(Err(StopExecution::DependentTasks)).ok();
}
Err(e) => {
tracker.cancel();
callback.send(Err(StopExecution::AllTasks)).ok();
Expand Down Expand Up @@ -455,7 +463,17 @@ impl ExecContext {
// Something else killed the child
ChildExit::KilledExternal => Err(InternalError::ExternalKill),
// The child was killed by turbo indicating a shutdown
ChildExit::Killed | ChildExit::Interrupted => Ok(ExecOutcome::Shutdown),
ChildExit::Killed | ChildExit::Interrupted => {
// We distinguish between a full shutdown and a restart based on whether the
// process manager is closing. If it is closing, it means we are shutting down
// the entire run. If it is not closing, it means we are restarting specific
// tasks.
if self.manager.is_closing() {
Ok(ExecOutcome::Shutdown)
} else {
Ok(ExecOutcome::Restarted)
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ portable-pty = "0.8.1"
tokio = { workspace = true, features = ["full", "time"] }
tracing.workspace = true
turbopath = { workspace = true }
turborepo-task-id = { workspace = true }

[lints]
workspace = true
Expand Down
34 changes: 27 additions & 7 deletions crates/turborepo-process/src/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::{
sync::{mpsc, watch},
};
use tracing::{debug, trace};
use turborepo_task_id::TaskId;

use super::{Command, PtySize};

Expand Down Expand Up @@ -384,20 +385,33 @@ pub struct Child {
}

#[derive(Clone, Debug)]
pub struct ChildCommandChannel(mpsc::Sender<ChildCommand>);
pub struct ChildCommandChannel {
sender: mpsc::Sender<ChildCommand>,
task_id: Option<TaskId<'static>>,
}

impl ChildCommandChannel {
pub fn new() -> (Self, mpsc::Receiver<ChildCommand>) {
pub fn new(task_id: Option<TaskId<'static>>) -> (Self, mpsc::Receiver<ChildCommand>) {
let (tx, rx) = mpsc::channel(1);
(ChildCommandChannel(tx), rx)
(
ChildCommandChannel {
sender: tx,
task_id,
},
rx,
)
}

pub async fn kill(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
self.0.send(ChildCommand::Kill).await
self.sender.send(ChildCommand::Kill).await
}

pub async fn stop(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
self.0.send(ChildCommand::Stop).await
self.sender.send(ChildCommand::Stop).await
}

pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
self.task_id.as_ref()
}
}

Expand All @@ -416,6 +430,7 @@ impl Child {
pty_size: Option<PtySize>,
) -> io::Result<Self> {
let label = command.label();
let task_id = command.get_task_id().cloned();
let SpawnResult {
handle: mut child,
io: ChildIO { stdin, output },
Expand All @@ -428,7 +443,7 @@ impl Child {

let pid = child.pid();

let (command_tx, mut command_rx) = ChildCommandChannel::new();
let (command_tx, mut command_rx) = ChildCommandChannel::new(task_id);

// we use a watch channel to communicate the exit code back to the
// caller. we are interested in three cases:
Expand Down Expand Up @@ -680,6 +695,11 @@ impl Child {
pub fn label(&self) -> &str {
&self.label
}

pub fn task_id(&self) -> Option<&TaskId<'static>> {
self.command_channel
.get_task_id()
}
}

// Adds a trailing newline if necessary to the buffer
Expand Down Expand Up @@ -750,7 +770,7 @@ impl ChildStateManager {
impl Child {
// Helper method for checking if child is running
fn is_running(&self) -> bool {
!self.command_channel.0.is_closed()
!self.command_channel.sender.is_closed()
}
}

Expand Down
14 changes: 14 additions & 0 deletions crates/turborepo-process/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use itertools::Itertools;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_task_id::TaskId;

/// A command builder that can be used to build both regular
/// child processes and ones spawned hooked up to a PTY
Expand All @@ -17,6 +18,7 @@ pub struct Command {
env: BTreeMap<OsString, OsString>,
open_stdin: bool,
env_clear: bool,
task_id: Option<TaskId<'static>>,
}

impl Command {
Expand All @@ -29,6 +31,7 @@ impl Command {
env: BTreeMap::new(),
open_stdin: false,
env_clear: false,
task_id: None,
}
}

Expand Down Expand Up @@ -106,6 +109,15 @@ impl Command {
pub fn program(&self) -> &OsStr {
&self.program
}

pub fn task_id(&mut self, task_id: TaskId<'static>) -> &mut Self {
self.task_id = Some(task_id);
self
}

pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
self.task_id.as_ref()
}
}

impl From<Command> for tokio::process::Command {
Expand All @@ -117,6 +129,7 @@ impl From<Command> for tokio::process::Command {
env,
open_stdin,
env_clear,
task_id: _,
} = value;

let mut cmd = tokio::process::Command::new(program);
Expand Down Expand Up @@ -149,6 +162,7 @@ impl From<Command> for portable_pty::CommandBuilder {
cwd,
env,
env_clear,
task_id: _,
..
} = value;
let mut cmd = portable_pty::CommandBuilder::new(program);
Expand Down
Loading