diff --git a/crates/turbopack-dev-server/src/source/mod.rs b/crates/turbopack-dev-server/src/source/mod.rs index ab5757a39d4ea..b83e9cdc903b3 100644 --- a/crates/turbopack-dev-server/src/source/mod.rs +++ b/crates/turbopack-dev-server/src/source/mod.rs @@ -52,6 +52,26 @@ impl BodyError { } } +impl From<&str> for BodyError { + fn from(err: &str) -> Self { + BodyError { + err: err.to_string(), + } + } +} + +impl From for BodyError { + fn from(err: String) -> Self { + BodyError { err } + } +} + +impl From for BodyError { + fn from(value: anyhow::Error) -> Self { + value.to_string().into() + } +} + /// The return value of a content source when getting a path. A specificity is /// attached and when combining results this specificity should be used to order /// results. @@ -246,29 +266,29 @@ pub struct ContentSourceData { pub cache_buster: u64, } -type Chunk = Result; +pub type BodyChunk = Result; /// A request body. #[turbo_tasks::value(shared)] #[derive(Default, Clone, Debug)] pub struct Body { #[turbo_tasks(trace_ignore)] - chunks: Stream, + chunks: Stream, } impl Body { /// Creates a new body from a list of chunks. - pub fn new(chunks: Vec) -> Self { + pub fn new(chunks: Vec) -> Self { Self { chunks: Stream::new_closed(chunks), } } /// Returns an iterator over the body's chunks. - pub fn read(&self) -> StreamRead { + pub fn read(&self) -> StreamRead { self.chunks.read() } - pub fn from_stream + Send + Sync + Unpin + 'static>( + pub fn from_stream + Send + Unpin + 'static>( source: T, ) -> Self { Self { diff --git a/crates/turbopack-node/src/evaluate.rs b/crates/turbopack-node/src/evaluate.rs index d249e28e22429..f2773267cf4c0 100644 --- a/crates/turbopack-node/src/evaluate.rs +++ b/crates/turbopack-node/src/evaluate.rs @@ -249,17 +249,6 @@ pub fn evaluate( additional_invalidation: CompletionVc, debug: bool, ) -> JavaScriptEvaluationVc { - let pool = get_evaluate_pool( - module_asset, - cwd, - env, - context, - chunking_context, - runtime_entries, - additional_invalidation, - debug, - ); - // Note the following code uses some hacks to create a child task that produces // a stream that is returned by this task. @@ -278,11 +267,16 @@ pub fn evaluate( // run the evaluation as side effect compute_evaluate_stream( - pool, + module_asset, cwd, + env, context_ident_for_issue, + context, chunking_context, + runtime_entries, args, + additional_invalidation, + debug, JavaScriptStreamSender { get: Box::new(move || { if let Some(sender) = initial.lock().take() { @@ -308,11 +302,16 @@ pub fn evaluate( #[turbo_tasks::function] async fn compute_evaluate_stream( - pool: NodeJsPoolVc, + module_asset: AssetVc, cwd: FileSystemPathVc, + env: ProcessEnvVc, context_ident_for_issue: AssetIdentVc, + context: AssetContextVc, chunking_context: ChunkingContextVc, + runtime_entries: Option, args: Vec, + additional_invalidation: CompletionVc, + debug: bool, sender: JavaScriptStreamSenderVc, ) { mark_finished(); @@ -322,7 +321,20 @@ async fn compute_evaluate_stream( }; let stream = generator! { - let pool = pool.await?; + let pool = get_evaluate_pool( + module_asset, + cwd, + env, + context, + chunking_context, + runtime_entries, + additional_invalidation, + debug, + ); + + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = pool.strongly_consistent().await?; let args = args.into_iter().try_join().await?; // Assume this is a one-off operation, so we can kill the process @@ -407,7 +419,7 @@ async fn pull_operation( match operation.recv().await? { EvalJavaScriptIncomingMessage::Error(error) => { EvaluationIssue { - error: error.clone(), + error, context_ident: context_ident_for_issue, assets_for_source_mapping: pool.assets_for_source_mapping, assets_root: pool.assets_root, diff --git a/crates/turbopack-node/src/lib.rs b/crates/turbopack-node/src/lib.rs index fc3f753ffc622..5f2498fb4064a 100644 --- a/crates/turbopack-node/src/lib.rs +++ b/crates/turbopack-node/src/lib.rs @@ -278,6 +278,7 @@ pub async fn get_intermediate_asset( .into()) } +#[derive(Clone, Debug)] #[turbo_tasks::value(shared)] pub struct ResponseHeaders { pub status: u16, diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index 557e011e3ca14..95de500b5cc0c 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -35,15 +35,16 @@ enum RenderProxyOutgoingMessage<'a> { BodyEnd, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] enum RenderProxyIncomingMessage { Headers { data: ResponseHeaders }, - Body { data: Vec }, + BodyChunk { data: Vec }, + BodyEnd, Error(StructuredError), } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] enum RenderStaticIncomingMessage { #[serde(rename_all = "camelCase")] @@ -52,6 +53,13 @@ enum RenderStaticIncomingMessage { headers: Vec<(String, String)>, body: String, }, + Headers { + data: ResponseHeaders, + }, + BodyChunk { + data: Vec, + }, + BodyEnd, Rewrite { path: String, }, diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index b20875bd87aa4..d1ea9ba7901d4 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -1,10 +1,16 @@ -use anyhow::{bail, Result}; -use futures::StreamExt; -use turbo_tasks::primitives::StringVc; +use anyhow::{anyhow, bail, Result}; +use async_stream::try_stream as generator; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + pin_mut, SinkExt, StreamExt, TryStreamExt, +}; +use parking_lot::Mutex; +use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; -use turbopack_core::{asset::AssetVc, chunk::ChunkingContextVc, error::PrettyPrintError}; -use turbopack_dev_server::source::{BodyVc, ProxyResult, ProxyResultVc}; +use turbopack_core::{chunk::ChunkingContextVc, error::PrettyPrintError}; +use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc}; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ @@ -31,118 +37,57 @@ pub async fn render_proxy( data: RenderDataVc, body: BodyVc, ) -> Result { - let intermediate_asset = get_intermediate_asset( - module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), - intermediate_output_path, - ); - - let pool = get_renderer_pool( + let render = render_stream( cwd, env, - intermediate_asset, + path, + module, + runtime_entries, + chunking_context, intermediate_output_path, output_root, project_dir, - /* debug */ false, + data, + body, ) .await?; - let mut operation = match pool.operation().await { - Ok(operation) => operation, - Err(err) => { - return proxy_error(path, err, None).await; + let mut stream = render.read(); + let first = match stream.try_next().await? { + Some(f) => f, + None => { + // If an Error was received first, then it would have been + // transformed into a proxy err error response. + bail!("did not receive response from render"); } }; - match run_proxy_operation( - &mut operation, - data, - body, - intermediate_asset, - intermediate_output_path, - project_dir, - ) - .await - { - Ok(proxy_result) => Ok(proxy_result.cell()), - Err(err) => Ok(proxy_error(path, err, Some(operation)).await?), - } -} - -async fn run_proxy_operation( - operation: &mut NodeJsOperation, - data: RenderDataVc, - body: BodyVc, - intermediate_asset: AssetVc, - intermediate_output_path: FileSystemPathVc, - project_dir: FileSystemPathVc, -) -> Result { - let data = data.await?; - // First, send the render data. - operation - .send(RenderProxyOutgoingMessage::Headers { data: &data }) - .await?; - - let mut body = body.await?.read(); - // Then, send the binary body in chunks. - while let Some(data) = body.next().await { - operation - .send(RenderProxyOutgoingMessage::BodyChunk { data: &data? }) - .await?; - } - - operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; - - let (status, headers) = match operation.recv().await? { - RenderProxyIncomingMessage::Headers { - data: ResponseHeaders { status, headers }, - } => (status, headers), - RenderProxyIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) - } - _ => { - bail!("unexpected response from the Node.js process while reading response headers") - } + let RenderItem::Headers(data) = first else { + bail!("did not receive headers from render"); }; - let body = match operation.recv().await? { - RenderProxyIncomingMessage::Body { data: body } => body, - RenderProxyIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) - } - _ => { - bail!("unexpected response from the Node.js process while reading response body") - } + let body = Body::from_stream(stream.map(|item| match item { + Ok(RenderItem::BodyChunk(b)) => Ok(b), + Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))), + Err(e) => Err(BodyError::new(format!( + "error streaming proxied contents: {}", + e + ))), + })); + let result = ProxyResult { + status: data.status, + headers: data.headers, + body, }; - Ok(ProxyResult { - status, - headers, - body: body.into(), - }) + Ok(result.cell()) } async fn proxy_error( path: FileSystemPathVc, error: anyhow::Error, operation: Option, -) -> Result { +) -> Result<(u16, String)> { let message = format!("{}", PrettyPrintError(&error)); let status = match operation { @@ -156,12 +101,13 @@ async fn proxy_error( } let status_code = 500; - let body = &*error_html( + let body = error_html( status_code, "An error occurred while proxying the request to Node.js".to_string(), format!("{message}\n\n{}", details.join("\n")), ) - .await?; + .await? + .clone_value(); RenderingIssue { context: path, @@ -172,13 +118,195 @@ async fn proxy_error( .as_issue() .emit(); - Ok(ProxyResult { - status: status_code, - headers: vec![( - "content-type".to_string(), - "text/html; charset=utf-8".to_string(), - )], - body: body.clone().into(), + Ok((status_code, body)) +} + +#[derive(Clone, Debug)] +#[turbo_tasks::value] +enum RenderItem { + Headers(ResponseHeaders), + BodyChunk(Bytes), +} + +type RenderItemResult = Result; + +#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")] +pub struct RenderStreamSender { + #[turbo_tasks(trace_ignore, debug_ignore)] + get: Box UnboundedSender + Send + Sync>, +} + +#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")] +struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); + +#[turbo_tasks::function] +fn render_stream( + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + chunking_context: ChunkingContextVc, + intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, + project_dir: FileSystemPathVc, + data: RenderDataVc, + body: BodyVc, +) -> RenderStreamVc { + // Note the following code uses some hacks to create a child task that produces + // a stream that is returned by this task. + + // We create a new cell in this task, which will be updated from the + // [render_stream_internal] task. + let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID); + + // We initialize the cell with a stream that is open, but has no values. + // The first [render_stream_internal] pipe call will pick up that stream. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + let initial = Mutex::new(Some(sender)); + + // run the evaluation as side effect + render_stream_internal( + cwd, + env, + path, + module, + runtime_entries, + chunking_context, + intermediate_output_path, + output_root, + project_dir, + data, + body, + RenderStreamSender { + get: Box::new(move || { + if let Some(sender) = initial.lock().take() { + sender + } else { + // In cases when only [render_stream_internal] is (re)executed, we need to + // update the old stream with a new value. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + sender + } + }), + } + .cell(), + ); + + let raw: RawVc = cell.into(); + raw.into() +} + +#[turbo_tasks::function] +async fn render_stream_internal( + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + chunking_context: ChunkingContextVc, + intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, + project_dir: FileSystemPathVc, + data: RenderDataVc, + body: BodyVc, + sender: RenderStreamSenderVc, +) { + mark_finished(); + let Ok(sender) = sender.await else { + // Impossible to handle the error in a good way. + return; + }; + + let stream = generator! { + let intermediate_asset = get_intermediate_asset( + module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), + intermediate_output_path, + ); + let pool = get_renderer_pool( + cwd, + env, + intermediate_asset, + intermediate_output_path, + output_root, + project_dir, + /* debug */ false, + ); + + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = pool.strongly_consistent().await?; + let data = data.await?; + let mut operation = pool.operation().await?; + + // First, send the render data. + operation + .send(RenderProxyOutgoingMessage::Headers { data: &data }) + .await?; + // Then, send the binary body in chunks. + let mut body = body.await?.read(); + while let Some(data) = body.next().await { + operation + .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() }) + .await?; + } + operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; + + match operation.recv().await? { + RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data), + RenderProxyIncomingMessage::Error(error) => { + // If we don't get headers, then something is very wrong. Instead, we send down a + // 500 proxy error as if it were the proper result. + let trace = trace_stack( + error, + intermediate_asset, + intermediate_output_path, + project_dir + ) + .await?; + let (status, body) = proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?; + yield RenderItem::Headers(ResponseHeaders { + status, + headers: vec![( + "content-type".to_string(), + "text/html; charset=utf-8".to_string(), + )], + }); + yield RenderItem::BodyChunk(body.into()); + return; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + }; + + loop { + match operation.recv().await? { + RenderProxyIncomingMessage::BodyChunk { data } => { + yield RenderItem::BodyChunk(data.into()); + } + RenderProxyIncomingMessage::BodyEnd => break, + RenderProxyIncomingMessage::Error(error) => { + // We have already started to send a result, so we can't change the + // headers/body to a proxy error. + operation.disallow_reuse(); + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; + Err(anyhow!("error during streaming render: {}", trace))?; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + } + } + }; + + let mut sender = (sender.get)(); + pin_mut!(stream); + while let Some(value) = stream.next().await { + if sender.send(value).await.is_err() { + return; + } + if sender.flush().await.is_err() { + return; + } } - .cell()) } diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index 2821a2eedf6fb..6fc7150dc3345 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -1,15 +1,22 @@ -use anyhow::{bail, Context, Result}; -use turbo_tasks::primitives::StringVc; +use anyhow::{anyhow, bail, Context, Result}; +use async_stream::try_stream as generator; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + pin_mut, SinkExt, StreamExt, TryStreamExt, +}; +use parking_lot::Mutex; +use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; use turbopack_core::{ - asset::{Asset, AssetContentVc, AssetVc}, + asset::{Asset, AssetContentVc}, chunk::ChunkingContextVc, error::PrettyPrintError, }; use turbopack_dev_server::{ html::DevHtmlAssetVc, - source::{HeaderListVc, RewriteBuilder, RewriteVc}, + source::{Body, BodyError, HeaderListVc, RewriteBuilder, RewriteVc}, }; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; @@ -18,9 +25,10 @@ use super::{ }; use crate::{ get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, - render::error_page::error_html_body, source_map::trace_stack, + render::error_page::error_html_body, source_map::trace_stack, ResponseHeaders, }; +#[derive(Clone, Debug)] #[turbo_tasks::value] pub enum StaticResult { Content { @@ -28,6 +36,11 @@ pub enum StaticResult { status_code: u16, headers: HeaderListVc, }, + StreamedContent { + status: u16, + headers: HeaderListVc, + body: Body, + }, Rewrite(RewriteVc), } @@ -64,97 +77,51 @@ pub async fn render_static( project_dir: FileSystemPathVc, data: RenderDataVc, ) -> Result { - let intermediate_asset = get_intermediate_asset( - module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), - intermediate_output_path, - ); - let renderer_pool = get_renderer_pool( + let render = render_stream( cwd, env, - intermediate_asset, + path, + module, + runtime_entries, + fallback_page, + chunking_context, intermediate_output_path, output_root, project_dir, - /* debug */ false, - ); - // Read this strongly consistent, since we don't want to run inconsistent - // node.js code. - let pool = renderer_pool.strongly_consistent().await?; - let mut operation = match pool.operation().await { - Ok(operation) => operation, - Err(err) => { - return Ok(StaticResultVc::content( - static_error(path, err, None, fallback_page).await?, - 500, - HeaderListVc::empty(), - )) + data, + ) + .await?; + + let mut stream = render.read(); + let first = match stream.try_next().await? { + Some(f) => f, + None => { + // If an Error was received first, then it would have been + // transformed into a proxy err error response. + bail!("did not receive response from render"); } }; - Ok( - match run_static_operation( - &mut operation, - data, - intermediate_asset, - intermediate_output_path, - project_dir, - ) - .await - { - Ok(result) => result, - Err(err) => StaticResultVc::content( - static_error(path, err, Some(operation), fallback_page).await?, - 500, - HeaderListVc::empty(), - ), - }, - ) -} - -async fn run_static_operation( - operation: &mut NodeJsOperation, - data: RenderDataVc, - intermediate_asset: AssetVc, - intermediate_output_path: FileSystemPathVc, - project_dir: FileSystemPathVc, -) -> Result { - let data = data.await?; - - operation - .send(RenderStaticOutgoingMessage::Headers { data: &data }) - .await - .context("sending headers to node.js process")?; - Ok( - match operation - .recv() - .await - .context("receiving from node.js process")? - { - RenderStaticIncomingMessage::Rewrite { path } => { - StaticResultVc::rewrite(RewriteBuilder::new(path).build()) + Ok(match first { + RenderItem::Response(response) => response, + RenderItem::Headers(data) => { + let body = stream.map(|item| match item { + Ok(RenderItem::BodyChunk(b)) => Ok(b), + Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))), + Err(e) => Err(BodyError::new(format!( + "error streaming proxied contents: {}", + e + ))), + }); + StaticResult::StreamedContent { + status: data.status, + headers: HeaderListVc::cell(data.headers), + body: Body::from_stream(body), } - RenderStaticIncomingMessage::Response { - status_code, - headers, - body, - } => StaticResultVc::content( - FileContent::Content(File::from(body)).into(), - status_code, - HeaderListVc::cell(headers), - ), - RenderStaticIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) - } - }, - ) + .cell() + } + v => bail!("unexpected render item: {:#?}", v), + }) } async fn static_error( @@ -201,3 +168,202 @@ async fn static_error( Ok(html.content()) } + +#[derive(Clone, Debug)] +#[turbo_tasks::value] +enum RenderItem { + Response(StaticResultVc), + Headers(ResponseHeaders), + BodyChunk(Bytes), +} + +type RenderItemResult = Result; + +#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")] +pub struct RenderStreamSender { + #[turbo_tasks(trace_ignore, debug_ignore)] + get: Box UnboundedSender + Send + Sync>, +} + +#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")] +struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); + +#[turbo_tasks::function] +fn render_stream( + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + fallback_page: DevHtmlAssetVc, + chunking_context: ChunkingContextVc, + intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, + project_dir: FileSystemPathVc, + data: RenderDataVc, +) -> RenderStreamVc { + // Note the following code uses some hacks to create a child task that produces + // a stream that is returned by this task. + + // We create a new cell in this task, which will be updated from the + // [render_stream_internal] task. + let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID); + + // We initialize the cell with a stream that is open, but has no values. + // The first [render_stream_internal] pipe call will pick up that stream. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + let initial = Mutex::new(Some(sender)); + + // run the evaluation as side effect + render_stream_internal( + cwd, + env, + path, + module, + runtime_entries, + fallback_page, + chunking_context, + intermediate_output_path, + output_root, + project_dir, + data, + RenderStreamSender { + get: Box::new(move || { + if let Some(sender) = initial.lock().take() { + sender + } else { + // In cases when only [render_stream_internal] is (re)executed, we need to + // update the old stream with a new value. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + sender + } + }), + } + .cell(), + ); + + let raw: RawVc = cell.into(); + raw.into() +} + +#[turbo_tasks::function] +async fn render_stream_internal( + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + fallback_page: DevHtmlAssetVc, + chunking_context: ChunkingContextVc, + intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, + project_dir: FileSystemPathVc, + data: RenderDataVc, + sender: RenderStreamSenderVc, +) { + mark_finished(); + let Ok(sender) = sender.await else { + // Impossible to handle the error in a good way. + return; + }; + + let stream = generator! { + let intermediate_asset = get_intermediate_asset( + module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), + intermediate_output_path, + ); + let renderer_pool = get_renderer_pool( + cwd, + env, + intermediate_asset, + intermediate_output_path, + output_root, + project_dir, + /* debug */ false, + ); + + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = renderer_pool.strongly_consistent().await?; + let data = data.await?; + let mut operation = pool.operation().await?; + + operation + .send(RenderStaticOutgoingMessage::Headers { data: &data }) + .await + .context("sending headers to node.js process")?; + + match operation.recv().await? { + RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data), + RenderStaticIncomingMessage::Rewrite { path } => { + yield RenderItem::Response(StaticResultVc::rewrite(RewriteBuilder::new(path).build())); + return; + } + RenderStaticIncomingMessage::Response { + status_code, + headers, + body, + } => { + yield RenderItem::Response(StaticResultVc::content( + FileContent::Content(File::from(body)).into(), + status_code, + HeaderListVc::cell(headers), + )); + return; + } + RenderStaticIncomingMessage::Error(error) => { + // If we don't get headers, then something is very wrong. Instead, we send down a + // 500 proxy error as if it were the proper result. + let trace = trace_stack( + error, + intermediate_asset, + intermediate_output_path, + project_dir, + ) + .await?; + yield RenderItem::Response( + StaticResultVc::content( + static_error(path, anyhow!(trace), Some(operation), fallback_page).await?, + 500, + HeaderListVc::empty(), + ) + ); + return; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + }; + + // If we get here, then the first message was a Headers. Now we need to stream out the body + // chunks. + loop { + match operation.recv().await? { + RenderStaticIncomingMessage::BodyChunk { data } => { + yield RenderItem::BodyChunk(data.into()); + } + RenderStaticIncomingMessage::BodyEnd => break, + RenderStaticIncomingMessage::Error(error) => { + // We have already started to send a result, so we can't change the + // headers/body to a proxy error. + operation.disallow_reuse(); + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; + Err(anyhow!("error during streaming render: {}", trace))?; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + } + } + }; + + let mut sender = (sender.get)(); + pin_mut!(stream); + while let Some(value) = stream.next().await { + if sender.send(value).await.is_err() { + return; + } + if sender.flush().await.is_err() { + return; + } + } +} diff --git a/crates/turbopack-node/src/render/rendered_source.rs b/crates/turbopack-node/src/render/rendered_source.rs index 9ebfacb84d4c1..0e3180d55b7f8 100644 --- a/crates/turbopack-node/src/render/rendered_source.rs +++ b/crates/turbopack-node/src/render/rendered_source.rs @@ -21,7 +21,7 @@ use turbopack_dev_server::{ specificity::SpecificityVc, ContentSource, ContentSourceContent, ContentSourceContentVc, ContentSourceData, ContentSourceDataVary, ContentSourceDataVaryVc, ContentSourceResult, ContentSourceResultVc, - ContentSourceVc, GetContentSourceContent, GetContentSourceContentVc, + ContentSourceVc, GetContentSourceContent, GetContentSourceContentVc, ProxyResult, }, }; use turbopack_ecmascript::chunk::EcmascriptChunkPlaceablesVc; @@ -237,6 +237,19 @@ impl GetContentSourceContent for NodeRenderGetContentResult { status_code, headers, } => ContentSourceContentVc::static_with_headers(content.into(), status_code, headers), + StaticResult::StreamedContent { + status, + headers, + ref body, + } => ContentSourceContent::HttpProxy( + ProxyResult { + status, + headers: headers.await?.clone_value(), + body: body.clone(), + } + .cell(), + ) + .cell(), StaticResult::Rewrite(rewrite) => ContentSourceContent::Rewrite(rewrite).cell(), }) }