From 6f0d0edb7cd1563e70881953d0715e91c74ad2eb Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 21 Mar 2023 03:14:11 -0400 Subject: [PATCH 1/7] Update proxy handler --- crates/turbopack-dev-server/src/source/mod.rs | 30 ++++- crates/turbopack-node/src/render/mod.rs | 3 +- .../turbopack-node/src/render/render_proxy.rs | 115 ++++++++++++------ 3 files changed, 102 insertions(+), 46 deletions(-) diff --git a/crates/turbopack-dev-server/src/source/mod.rs b/crates/turbopack-dev-server/src/source/mod.rs index ab5757a39d4ea..b83e9cdc903b3 100644 --- a/crates/turbopack-dev-server/src/source/mod.rs +++ b/crates/turbopack-dev-server/src/source/mod.rs @@ -52,6 +52,26 @@ impl BodyError { } } +impl From<&str> for BodyError { + fn from(err: &str) -> Self { + BodyError { + err: err.to_string(), + } + } +} + +impl From for BodyError { + fn from(err: String) -> Self { + BodyError { err } + } +} + +impl From for BodyError { + fn from(value: anyhow::Error) -> Self { + value.to_string().into() + } +} + /// The return value of a content source when getting a path. A specificity is /// attached and when combining results this specificity should be used to order /// results. @@ -246,29 +266,29 @@ pub struct ContentSourceData { pub cache_buster: u64, } -type Chunk = Result; +pub type BodyChunk = Result; /// A request body. #[turbo_tasks::value(shared)] #[derive(Default, Clone, Debug)] pub struct Body { #[turbo_tasks(trace_ignore)] - chunks: Stream, + chunks: Stream, } impl Body { /// Creates a new body from a list of chunks. - pub fn new(chunks: Vec) -> Self { + pub fn new(chunks: Vec) -> Self { Self { chunks: Stream::new_closed(chunks), } } /// Returns an iterator over the body's chunks. - pub fn read(&self) -> StreamRead { + pub fn read(&self) -> StreamRead { self.chunks.read() } - pub fn from_stream + Send + Sync + Unpin + 'static>( + pub fn from_stream + Send + Unpin + 'static>( source: T, ) -> Self { Self { diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index 557e011e3ca14..e56eb401b15b1 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -39,7 +39,8 @@ enum RenderProxyOutgoingMessage<'a> { #[serde(tag = "type", rename_all = "camelCase")] enum RenderProxyIncomingMessage { Headers { data: ResponseHeaders }, - Body { data: Vec }, + BodyChunk { data: Vec }, + BodyEnd, Error(StructuredError), } diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index b20875bd87aa4..789446e55cfe1 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -1,10 +1,12 @@ use anyhow::{bail, Result}; +use async_stream::stream as genrator; use futures::StreamExt; use turbo_tasks::primitives::StringVc; +use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; use turbopack_core::{asset::AssetVc, chunk::ChunkingContextVc, error::PrettyPrintError}; -use turbopack_dev_server::source::{BodyVc, ProxyResult, ProxyResultVc}; +use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc}; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ @@ -50,11 +52,12 @@ pub async fn render_proxy( let mut operation = match pool.operation().await { Ok(operation) => operation, Err(err) => { - return proxy_error(path, err, None).await; + let (status, body) = proxy_error(path, err, None).await?; + return Ok(proxy_error_result(status, body)); } }; - match run_proxy_operation( + let (status, headers) = match start_proxy_operation( &mut operation, data, body, @@ -64,19 +67,70 @@ pub async fn render_proxy( ) .await { - Ok(proxy_result) => Ok(proxy_result.cell()), - Err(err) => Ok(proxy_error(path, err, Some(operation)).await?), + Ok(v) => v, + Err(err) => { + let (status, body) = proxy_error(path, err, Some(operation)).await?; + return Ok(proxy_error_result(status, body)); + } + }; + + let chunks = Stream::new_open( + vec![], + Box::pin(genrator! { + macro_rules! tri { + ($exp:expr) => { + match $exp { + Ok(v) => v, + Err(e) => { + yield Err(e.into()); + return; + } + } + } + } + + loop { + match tri!(operation.recv().await) { + RenderProxyIncomingMessage::BodyChunk { data } => { + yield Ok(Bytes::from(data)); + } + RenderProxyIncomingMessage::BodyEnd => break, + RenderProxyIncomingMessage::Error(error) => { + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path).await; + let e = trace.map_or_else(BodyError::from, BodyError::from); + yield Err(e); + break; + } + _ => { + operation.disallow_reuse(); + yield Err( + "unexpected response from the Node.js process while reading response body" + .into(), + ); + break; + } + } + } + }), + ); + + Ok(ProxyResult { + status, + headers, + body: Body::from_stream(chunks.read()), } + .cell()) } -async fn run_proxy_operation( +async fn start_proxy_operation( operation: &mut NodeJsOperation, data: RenderDataVc, body: BodyVc, intermediate_asset: AssetVc, intermediate_output_path: FileSystemPathVc, project_dir: FileSystemPathVc, -) -> Result { +) -> Result<(u16, Vec<(String, String)>)> { let data = data.await?; // First, send the render data. operation @@ -93,10 +147,10 @@ async fn run_proxy_operation( operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; - let (status, headers) = match operation.recv().await? { + match operation.recv().await? { RenderProxyIncomingMessage::Headers { data: ResponseHeaders { status, headers }, - } => (status, headers), + } => Ok((status, headers)), RenderProxyIncomingMessage::Error(error) => { bail!( trace_stack( @@ -111,38 +165,14 @@ async fn run_proxy_operation( _ => { bail!("unexpected response from the Node.js process while reading response headers") } - }; - - let body = match operation.recv().await? { - RenderProxyIncomingMessage::Body { data: body } => body, - RenderProxyIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) - } - _ => { - bail!("unexpected response from the Node.js process while reading response body") - } - }; - - Ok(ProxyResult { - status, - headers, - body: body.into(), - }) + } } async fn proxy_error( path: FileSystemPathVc, error: anyhow::Error, operation: Option, -) -> Result { +) -> Result<(u16, String)> { let message = format!("{}", PrettyPrintError(&error)); let status = match operation { @@ -156,12 +186,13 @@ async fn proxy_error( } let status_code = 500; - let body = &*error_html( + let body = error_html( status_code, "An error occurred while proxying the request to Node.js".to_string(), format!("{message}\n\n{}", details.join("\n")), ) - .await?; + .await? + .clone_value(); RenderingIssue { context: path, @@ -172,13 +203,17 @@ async fn proxy_error( .as_issue() .emit(); - Ok(ProxyResult { - status: status_code, + Ok((status_code, body)) +} + +fn proxy_error_result(status: u16, body: String) -> ProxyResultVc { + ProxyResult { + status, headers: vec![( "content-type".to_string(), "text/html; charset=utf-8".to_string(), )], body: body.clone().into(), } - .cell()) + .cell() } From 75f44d9f34392dcda0cd1cf1921b0d1e00966998 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Thu, 23 Mar 2023 00:15:40 -0400 Subject: [PATCH 2/7] Implement streamed static rendering --- crates/turbopack-node/src/render/mod.rs | 63 +++++++++- .../turbopack-node/src/render/render_proxy.rs | 54 +-------- .../src/render/render_static.rs | 112 ++++++++---------- .../src/render/rendered_source.rs | 15 ++- 4 files changed, 129 insertions(+), 115 deletions(-) diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index e56eb401b15b1..0a21d504c711b 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -1,7 +1,14 @@ +use async_stream::stream as generator; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; +use turbo_tasks_bytes::Stream; +use turbo_tasks_fs::FileSystemPathVc; +use turbopack_core::asset::AssetVc; +use turbopack_dev_server::source::Body; -use crate::{route_matcher::Param, ResponseHeaders, StructuredError}; +use crate::{ + pool::NodeJsOperation, route_matcher::Param, trace_stack, ResponseHeaders, StructuredError, +}; pub(crate) mod error_page; pub mod issue; @@ -39,8 +46,6 @@ enum RenderProxyOutgoingMessage<'a> { #[serde(tag = "type", rename_all = "camelCase")] enum RenderProxyIncomingMessage { Headers { data: ResponseHeaders }, - BodyChunk { data: Vec }, - BodyEnd, Error(StructuredError), } @@ -53,8 +58,60 @@ enum RenderStaticIncomingMessage { headers: Vec<(String, String)>, body: String, }, + Headers { + data: ResponseHeaders, + }, Rewrite { path: String, }, Error(StructuredError), } + +#[derive(Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +enum RenderBodyChunks { + BodyChunk { data: Vec }, + BodyEnd, + Error(StructuredError), +} + +pub(crate) fn stream_body_chunks( + mut operation: NodeJsOperation, + intermediate_asset: AssetVc, + intermediate_output_path: FileSystemPathVc, +) -> Body { + let chunks = Stream::new_open( + vec![], + Box::pin(generator! { + macro_rules! tri { + ($exp:expr) => { + match $exp { + Ok(v) => v, + Err(e) => { + operation.disallow_reuse(); + yield Err(e.into()); + return; + } + } + } + } + + loop { + match tri!(operation.recv().await) { + RenderBodyChunks::BodyChunk { data } => { + yield Ok(data.into()); + } + RenderBodyChunks::BodyEnd => break, + RenderBodyChunks::Error(error) => { + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path).await; + let e = trace.map_or_else(Into::into, Into::into); + yield Err(e); + break; + } + } + } + }), + ); + Body::from_stream(chunks.read()) +} diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index 789446e55cfe1..d3b0557686487 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -1,17 +1,15 @@ use anyhow::{bail, Result}; -use async_stream::stream as genrator; use futures::StreamExt; use turbo_tasks::primitives::StringVc; -use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; use turbopack_core::{asset::AssetVc, chunk::ChunkingContextVc, error::PrettyPrintError}; -use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc}; +use turbopack_dev_server::source::{BodyVc, ProxyResult, ProxyResultVc}; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ - issue::RenderingIssue, RenderDataVc, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, - ResponseHeaders, + issue::RenderingIssue, stream_body_chunks, RenderDataVc, RenderProxyIncomingMessage, + RenderProxyOutgoingMessage, ResponseHeaders, }; use crate::{ get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, @@ -74,51 +72,10 @@ pub async fn render_proxy( } }; - let chunks = Stream::new_open( - vec![], - Box::pin(genrator! { - macro_rules! tri { - ($exp:expr) => { - match $exp { - Ok(v) => v, - Err(e) => { - yield Err(e.into()); - return; - } - } - } - } - - loop { - match tri!(operation.recv().await) { - RenderProxyIncomingMessage::BodyChunk { data } => { - yield Ok(Bytes::from(data)); - } - RenderProxyIncomingMessage::BodyEnd => break, - RenderProxyIncomingMessage::Error(error) => { - let trace = - trace_stack(error, intermediate_asset, intermediate_output_path).await; - let e = trace.map_or_else(BodyError::from, BodyError::from); - yield Err(e); - break; - } - _ => { - operation.disallow_reuse(); - yield Err( - "unexpected response from the Node.js process while reading response body" - .into(), - ); - break; - } - } - } - }), - ); - Ok(ProxyResult { status, headers, - body: Body::from_stream(chunks.read()), + body: stream_body_chunks(operation, intermediate_asset, intermediate_output_path), } .cell()) } @@ -162,9 +119,6 @@ async fn start_proxy_operation( .await? ) } - _ => { - bail!("unexpected response from the Node.js process while reading response headers") - } } } diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index 2821a2eedf6fb..adf19b5a2b3dc 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -1,20 +1,21 @@ -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use turbo_tasks::primitives::StringVc; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; use turbopack_core::{ - asset::{Asset, AssetContentVc, AssetVc}, + asset::{Asset, AssetContentVc}, chunk::ChunkingContextVc, error::PrettyPrintError, }; use turbopack_dev_server::{ html::DevHtmlAssetVc, - source::{HeaderListVc, RewriteBuilder, RewriteVc}, + source::{Body, HeaderListVc, RewriteBuilder, RewriteVc}, }; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ - issue::RenderingIssue, RenderDataVc, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, + issue::RenderingIssue, stream_body_chunks, RenderDataVc, RenderStaticIncomingMessage, + RenderStaticOutgoingMessage, }; use crate::{ get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, @@ -28,6 +29,11 @@ pub enum StaticResult { status_code: u16, headers: HeaderListVc, }, + StreamedContent { + status: u16, + headers: HeaderListVc, + body: Body, + }, Rewrite(RewriteVc), } @@ -91,70 +97,54 @@ pub async fn render_static( } }; - Ok( - match run_static_operation( - &mut operation, - data, - intermediate_asset, - intermediate_output_path, - project_dir, - ) - .await - { - Ok(result) => result, - Err(err) => StaticResultVc::content( - static_error(path, err, Some(operation), fallback_page).await?, - 500, - HeaderListVc::empty(), - ), - }, - ) -} - -async fn run_static_operation( - operation: &mut NodeJsOperation, - data: RenderDataVc, - intermediate_asset: AssetVc, - intermediate_output_path: FileSystemPathVc, - project_dir: FileSystemPathVc, -) -> Result { let data = data.await?; operation .send(RenderStaticOutgoingMessage::Headers { data: &data }) .await .context("sending headers to node.js process")?; - Ok( - match operation - .recv() - .await - .context("receiving from node.js process")? - { - RenderStaticIncomingMessage::Rewrite { path } => { - StaticResultVc::rewrite(RewriteBuilder::new(path).build()) - } - RenderStaticIncomingMessage::Response { - status_code, - headers, - body, - } => StaticResultVc::content( - FileContent::Content(File::from(body)).into(), - status_code, - HeaderListVc::cell(headers), - ), - RenderStaticIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) + + let first = operation + .recv() + .await + .context("receiving from node.js process")?; + + Ok(match first { + RenderStaticIncomingMessage::Rewrite { path } => { + StaticResultVc::rewrite(RewriteBuilder::new(path).build()) + } + RenderStaticIncomingMessage::Response { + status_code, + headers, + body, + } => StaticResultVc::content( + FileContent::Content(File::from(body)).into(), + status_code, + HeaderListVc::cell(headers), + ), + RenderStaticIncomingMessage::Error(error) => { + let trace = trace_stack( + error, + intermediate_asset, + intermediate_output_path, + project_dir, + ) + .await?; + StaticResultVc::content( + static_error(path, anyhow!(trace), Some(operation), fallback_page).await?, + 500, + HeaderListVc::empty(), + ) + } + RenderStaticIncomingMessage::Headers { data } => { + StaticResult::StreamedContent { + status: data.status, + headers: HeaderListVc::cell(data.headers), + body: stream_body_chunks(operation, intermediate_asset, intermediate_output_path), } - }, - ) + } + .cell(), + }) } async fn static_error( diff --git a/crates/turbopack-node/src/render/rendered_source.rs b/crates/turbopack-node/src/render/rendered_source.rs index 9ebfacb84d4c1..0e3180d55b7f8 100644 --- a/crates/turbopack-node/src/render/rendered_source.rs +++ b/crates/turbopack-node/src/render/rendered_source.rs @@ -21,7 +21,7 @@ use turbopack_dev_server::{ specificity::SpecificityVc, ContentSource, ContentSourceContent, ContentSourceContentVc, ContentSourceData, ContentSourceDataVary, ContentSourceDataVaryVc, ContentSourceResult, ContentSourceResultVc, - ContentSourceVc, GetContentSourceContent, GetContentSourceContentVc, + ContentSourceVc, GetContentSourceContent, GetContentSourceContentVc, ProxyResult, }, }; use turbopack_ecmascript::chunk::EcmascriptChunkPlaceablesVc; @@ -237,6 +237,19 @@ impl GetContentSourceContent for NodeRenderGetContentResult { status_code, headers, } => ContentSourceContentVc::static_with_headers(content.into(), status_code, headers), + StaticResult::StreamedContent { + status, + headers, + ref body, + } => ContentSourceContent::HttpProxy( + ProxyResult { + status, + headers: headers.await?.clone_value(), + body: body.clone(), + } + .cell(), + ) + .cell(), StaticResult::Rewrite(rewrite) => ContentSourceContent::Rewrite(rewrite).cell(), }) } From ece41f27efecb198546517ccc297fc82bfb05f57 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Fri, 24 Mar 2023 00:36:03 -0400 Subject: [PATCH 3/7] Fix merge conflicts --- crates/turbopack-node/src/render/mod.rs | 6 ++++-- crates/turbopack-node/src/render/render_proxy.rs | 7 ++++++- crates/turbopack-node/src/render/render_static.rs | 9 +++++++-- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index 0a21d504c711b..3eb21f7cc1f39 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -7,7 +7,8 @@ use turbopack_core::asset::AssetVc; use turbopack_dev_server::source::Body; use crate::{ - pool::NodeJsOperation, route_matcher::Param, trace_stack, ResponseHeaders, StructuredError, + pool::NodeJsOperation, route_matcher::Param, source_map::trace_stack, ResponseHeaders, + StructuredError, }; pub(crate) mod error_page; @@ -79,6 +80,7 @@ pub(crate) fn stream_body_chunks( mut operation: NodeJsOperation, intermediate_asset: AssetVc, intermediate_output_path: FileSystemPathVc, + project_dir: FileSystemPathVc, ) -> Body { let chunks = Stream::new_open( vec![], @@ -104,7 +106,7 @@ pub(crate) fn stream_body_chunks( RenderBodyChunks::BodyEnd => break, RenderBodyChunks::Error(error) => { let trace = - trace_stack(error, intermediate_asset, intermediate_output_path).await; + trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await; let e = trace.map_or_else(Into::into, Into::into); yield Err(e); break; diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index d3b0557686487..c80ece0789c41 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -75,7 +75,12 @@ pub async fn render_proxy( Ok(ProxyResult { status, headers, - body: stream_body_chunks(operation, intermediate_asset, intermediate_output_path), + body: stream_body_chunks( + operation, + intermediate_asset, + intermediate_output_path, + project_dir, + ), } .cell()) } diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index adf19b5a2b3dc..a9d3a130549e3 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; use turbo_tasks::primitives::StringVc; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; @@ -140,7 +140,12 @@ pub async fn render_static( StaticResult::StreamedContent { status: data.status, headers: HeaderListVc::cell(data.headers), - body: stream_body_chunks(operation, intermediate_asset, intermediate_output_path), + body: stream_body_chunks( + operation, + intermediate_asset, + intermediate_output_path, + project_dir, + ), } } .cell(), From 2f36ed42f9b6ec9db588e638f1d35bf78567f9d8 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 28 Mar 2023 18:51:07 -0400 Subject: [PATCH 4/7] Implement Stream Cell approach for proxy renders --- crates/turbopack-node/src/lib.rs | 1 + crates/turbopack-node/src/render/mod.rs | 4 +- .../turbopack-node/src/render/render_proxy.rs | 291 ++++++++++++------ 3 files changed, 209 insertions(+), 87 deletions(-) diff --git a/crates/turbopack-node/src/lib.rs b/crates/turbopack-node/src/lib.rs index fc3f753ffc622..5f2498fb4064a 100644 --- a/crates/turbopack-node/src/lib.rs +++ b/crates/turbopack-node/src/lib.rs @@ -278,6 +278,7 @@ pub async fn get_intermediate_asset( .into()) } +#[derive(Clone, Debug)] #[turbo_tasks::value(shared)] pub struct ResponseHeaders { pub status: u16, diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index 3eb21f7cc1f39..d97bb9cf9ac19 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -43,10 +43,12 @@ enum RenderProxyOutgoingMessage<'a> { BodyEnd, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] enum RenderProxyIncomingMessage { Headers { data: ResponseHeaders }, + BodyChunk { data: Vec }, + BodyEnd, Error(StructuredError), } diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index c80ece0789c41..d84cd613e84c3 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -1,19 +1,27 @@ -use anyhow::{bail, Result}; -use futures::StreamExt; -use turbo_tasks::primitives::StringVc; +use anyhow::{anyhow, bail, Result}; +use async_stream::try_stream as generator; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + pin_mut, SinkExt, StreamExt, +}; +use parking_lot::Mutex; +use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; use turbopack_core::{asset::AssetVc, chunk::ChunkingContextVc, error::PrettyPrintError}; -use turbopack_dev_server::source::{BodyVc, ProxyResult, ProxyResultVc}; +use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc}; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ - issue::RenderingIssue, stream_body_chunks, RenderDataVc, RenderProxyIncomingMessage, - RenderProxyOutgoingMessage, ResponseHeaders, + issue::RenderingIssue, RenderDataVc, RenderProxyIncomingMessage, RenderProxyOutgoingMessage, + ResponseHeaders, }; use crate::{ - get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, - render::error_page::error_html, source_map::trace_stack, + get_intermediate_asset, get_renderer_pool, + pool::{NodeJsOperation, NodeJsPoolVc}, + render::error_page::error_html, + source_map::trace_stack, }; /// Renders a module as static HTML in a node.js process. @@ -44,87 +52,48 @@ pub async fn render_proxy( output_root, project_dir, /* debug */ false, - ) - .await?; - - let mut operation = match pool.operation().await { - Ok(operation) => operation, - Err(err) => { - let (status, body) = proxy_error(path, err, None).await?; - return Ok(proxy_error_result(status, body)); - } - }; + ); - let (status, headers) = match start_proxy_operation( - &mut operation, + let render = render_stream( + pool, data, body, intermediate_asset, intermediate_output_path, project_dir, + path, ) - .await - { - Ok(v) => v, - Err(err) => { - let (status, body) = proxy_error(path, err, Some(operation)).await?; - return Ok(proxy_error_result(status, body)); + .await?; + + let mut stream = render.read(); + let first = match stream.next().await { + Some(Ok(f)) => f, + _ => { + // If an Error was received first, then it would have been + // transformed into a proxy err error response. + bail!("did not receive response from render"); } }; - Ok(ProxyResult { - status, - headers, - body: stream_body_chunks( - operation, - intermediate_asset, - intermediate_output_path, - project_dir, - ), - } - .cell()) -} - -async fn start_proxy_operation( - operation: &mut NodeJsOperation, - data: RenderDataVc, - body: BodyVc, - intermediate_asset: AssetVc, - intermediate_output_path: FileSystemPathVc, - project_dir: FileSystemPathVc, -) -> Result<(u16, Vec<(String, String)>)> { - let data = data.await?; - // First, send the render data. - operation - .send(RenderProxyOutgoingMessage::Headers { data: &data }) - .await?; - - let mut body = body.await?.read(); - // Then, send the binary body in chunks. - while let Some(data) = body.next().await { - operation - .send(RenderProxyOutgoingMessage::BodyChunk { data: &data? }) - .await?; - } + let RenderItem::Headers(data) = first else { + bail!("did not receive headers from render"); + }; - operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; + let body = Body::from_stream(stream.map(|item| match item { + Ok(RenderItem::BodyChunk(b)) => Ok(b), + Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))), + Err(e) => Err(BodyError::new(format!( + "error streaming proxied contents: {}", + e + ))), + })); + let result = ProxyResult { + status: data.status, + headers: data.headers, + body, + }; - match operation.recv().await? { - RenderProxyIncomingMessage::Headers { - data: ResponseHeaders { status, headers }, - } => Ok((status, headers)), - RenderProxyIncomingMessage::Error(error) => { - bail!( - trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir - ) - .await? - ) - } - } + Ok(result.cell()) } async fn proxy_error( @@ -165,14 +134,164 @@ async fn proxy_error( Ok((status_code, body)) } -fn proxy_error_result(status: u16, body: String) -> ProxyResultVc { - ProxyResult { - status, - headers: vec![( - "content-type".to_string(), - "text/html; charset=utf-8".to_string(), - )], - body: body.clone().into(), +#[derive(Clone, Debug)] +#[turbo_tasks::value] +enum RenderItem { + Headers(ResponseHeaders), + BodyChunk(Bytes), +} + +type RenderItemResult = Result; + +#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")] +pub struct RenderStreamSender { + #[turbo_tasks(trace_ignore, debug_ignore)] + get: Box UnboundedSender + Send + Sync>, +} + +#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")] +struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); + +#[turbo_tasks::function] +fn render_stream( + pool: NodeJsPoolVc, + data: RenderDataVc, + body: BodyVc, + intermediate_asset: AssetVc, + intermediate_output_path: FileSystemPathVc, + project_dir: FileSystemPathVc, + error_path: FileSystemPathVc, +) -> RenderStreamVc { + // Note the following code uses some hacks to create a child task that produces + // a stream that is returned by this task. + + // We create a new cell in this task, which will be updated from the + // [render_stream_internal] task. + let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID); + + // We initialize the cell with a stream that is open, but has no values. + // The first [render_stream_internal] pipe call will pick up that stream. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + let initial = Mutex::new(Some(sender)); + + // run the evaluation as side effect + render_stream_internal( + pool, + data, + body, + intermediate_asset, + intermediate_output_path, + project_dir, + error_path, + RenderStreamSender { + get: Box::new(move || { + if let Some(sender) = initial.lock().take() { + sender + } else { + // In cases when only [render_stream_internal] is (re)executed, we need to + // update the old stream with a new value. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + sender + } + }), + } + .cell(), + ); + + let raw: RawVc = cell.into(); + raw.into() +} + +#[turbo_tasks::function] +async fn render_stream_internal( + pool: NodeJsPoolVc, + data: RenderDataVc, + body: BodyVc, + intermediate_asset: AssetVc, + intermediate_output_path: FileSystemPathVc, + project_dir: FileSystemPathVc, + error_path: FileSystemPathVc, + sender: RenderStreamSenderVc, +) { + mark_finished(); + let Ok(sender) = sender.await else { + // Impossible to handle the error in a good way. + return; + }; + + let stream = generator! { + let data = data.await?; + let pool = pool.await?; + let mut operation = pool.operation().await?; + + // First, send the render data. + operation + .send(RenderProxyOutgoingMessage::Headers { data: &data }) + .await?; + // Then, send the binary body in chunks. + let mut body = body.await?.read(); + while let Some(data) = body.next().await { + operation + .send(RenderProxyOutgoingMessage::BodyChunk { data: &data.unwrap() }) + .await?; + } + operation.send(RenderProxyOutgoingMessage::BodyEnd).await?; + + match operation.recv().await? { + RenderProxyIncomingMessage::Headers { data } => yield RenderItem::Headers(data), + RenderProxyIncomingMessage::Error(error) => { + // If we don't get headers, then something is very wrong. Instead, we send down a + // 500 proxy error as if it were the proper result. + let trace = trace_stack( + error, + intermediate_asset, + intermediate_output_path, + project_dir + ) + .await?; + let (status, body) = proxy_error(error_path, anyhow!("error rendering: {}", trace), Some(operation)).await?; + yield RenderItem::Headers(ResponseHeaders { + status, + headers: vec![( + "content-type".to_string(), + "text/html; charset=utf-8".to_string(), + )], + }); + yield RenderItem::BodyChunk(body.into()); + return; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + }; + + loop { + match operation.recv().await? { + RenderProxyIncomingMessage::BodyChunk { data } => { + yield RenderItem::BodyChunk(data.into()); + } + RenderProxyIncomingMessage::BodyEnd => break, + RenderProxyIncomingMessage::Error(error) => { + // We have already started to send a result, so we can't change the + // headers/body to a proxy error. + operation.disallow_reuse(); + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; + Err(anyhow!("error during streaming render: {}", trace))?; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + } + } + }; + + let mut sender = (sender.get)(); + pin_mut!(stream); + while let Some(value) = stream.next().await { + if sender.send(value).await.is_err() { + return; + } + if sender.flush().await.is_err() { + return; + } } - .cell() } From 831a267eb4196dcc6a78595c509297f05bc96501 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 28 Mar 2023 23:28:08 -0400 Subject: [PATCH 5/7] Implement Stream Cell approach for static renders --- crates/turbopack-node/src/render/mod.rs | 66 +--- .../src/render/render_static.rs | 288 ++++++++++++++---- 2 files changed, 229 insertions(+), 125 deletions(-) diff --git a/crates/turbopack-node/src/render/mod.rs b/crates/turbopack-node/src/render/mod.rs index d97bb9cf9ac19..95de500b5cc0c 100644 --- a/crates/turbopack-node/src/render/mod.rs +++ b/crates/turbopack-node/src/render/mod.rs @@ -1,15 +1,7 @@ -use async_stream::stream as generator; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; -use turbo_tasks_bytes::Stream; -use turbo_tasks_fs::FileSystemPathVc; -use turbopack_core::asset::AssetVc; -use turbopack_dev_server::source::Body; -use crate::{ - pool::NodeJsOperation, route_matcher::Param, source_map::trace_stack, ResponseHeaders, - StructuredError, -}; +use crate::{route_matcher::Param, ResponseHeaders, StructuredError}; pub(crate) mod error_page; pub mod issue; @@ -52,7 +44,7 @@ enum RenderProxyIncomingMessage { Error(StructuredError), } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] #[serde(tag = "type", rename_all = "camelCase")] enum RenderStaticIncomingMessage { #[serde(rename_all = "camelCase")] @@ -64,58 +56,12 @@ enum RenderStaticIncomingMessage { Headers { data: ResponseHeaders, }, + BodyChunk { + data: Vec, + }, + BodyEnd, Rewrite { path: String, }, Error(StructuredError), } - -#[derive(Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -enum RenderBodyChunks { - BodyChunk { data: Vec }, - BodyEnd, - Error(StructuredError), -} - -pub(crate) fn stream_body_chunks( - mut operation: NodeJsOperation, - intermediate_asset: AssetVc, - intermediate_output_path: FileSystemPathVc, - project_dir: FileSystemPathVc, -) -> Body { - let chunks = Stream::new_open( - vec![], - Box::pin(generator! { - macro_rules! tri { - ($exp:expr) => { - match $exp { - Ok(v) => v, - Err(e) => { - operation.disallow_reuse(); - yield Err(e.into()); - return; - } - } - } - } - - loop { - match tri!(operation.recv().await) { - RenderBodyChunks::BodyChunk { data } => { - yield Ok(data.into()); - } - RenderBodyChunks::BodyEnd => break, - RenderBodyChunks::Error(error) => { - let trace = - trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await; - let e = trace.map_or_else(Into::into, Into::into); - yield Err(e); - break; - } - } - } - }), - ); - Body::from_stream(chunks.read()) -} diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index a9d3a130549e3..28dfc066e8863 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -1,27 +1,37 @@ -use anyhow::{anyhow, Context, Result}; -use turbo_tasks::primitives::StringVc; +use anyhow::{anyhow, bail, Context, Result}; +use async_stream::try_stream as generator; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + pin_mut, SinkExt, StreamExt, +}; +use parking_lot::Mutex; +use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; +use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; use turbopack_core::{ - asset::{Asset, AssetContentVc}, + asset::{Asset, AssetContentVc, AssetVc}, chunk::ChunkingContextVc, error::PrettyPrintError, }; use turbopack_dev_server::{ html::DevHtmlAssetVc, - source::{Body, HeaderListVc, RewriteBuilder, RewriteVc}, + source::{Body, BodyError, HeaderListVc, RewriteBuilder, RewriteVc}, }; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; use super::{ - issue::RenderingIssue, stream_body_chunks, RenderDataVc, RenderStaticIncomingMessage, - RenderStaticOutgoingMessage, + issue::RenderingIssue, RenderDataVc, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, }; use crate::{ - get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, - render::error_page::error_html_body, source_map::trace_stack, + get_intermediate_asset, get_renderer_pool, + pool::{NodeJsOperation, NodeJsPoolVc}, + render::error_page::error_html_body, + source_map::trace_stack, + ResponseHeaders, }; +#[derive(Clone, Debug)] #[turbo_tasks::value] pub enum StaticResult { Content { @@ -83,72 +93,47 @@ pub async fn render_static( project_dir, /* debug */ false, ); - // Read this strongly consistent, since we don't want to run inconsistent - // node.js code. - let pool = renderer_pool.strongly_consistent().await?; - let mut operation = match pool.operation().await { - Ok(operation) => operation, - Err(err) => { - return Ok(StaticResultVc::content( - static_error(path, err, None, fallback_page).await?, - 500, - HeaderListVc::empty(), - )) - } - }; - let data = data.await?; - - operation - .send(RenderStaticOutgoingMessage::Headers { data: &data }) - .await - .context("sending headers to node.js process")?; + let render = render_stream( + renderer_pool, + data, + intermediate_asset, + intermediate_output_path, + project_dir, + path, + fallback_page, + ) + .await?; - let first = operation - .recv() - .await - .context("receiving from node.js process")?; + let mut stream = render.read(); + let first = match stream.next().await { + Some(Ok(f)) => f, + _ => { + // If an Error was received first, then it would have been + // transformed into a proxy err error response. + bail!("did not receive response from render"); + } + }; Ok(match first { - RenderStaticIncomingMessage::Rewrite { path } => { - StaticResultVc::rewrite(RewriteBuilder::new(path).build()) - } - RenderStaticIncomingMessage::Response { - status_code, - headers, - body, - } => StaticResultVc::content( - FileContent::Content(File::from(body)).into(), - status_code, - HeaderListVc::cell(headers), - ), - RenderStaticIncomingMessage::Error(error) => { - let trace = trace_stack( - error, - intermediate_asset, - intermediate_output_path, - project_dir, - ) - .await?; - StaticResultVc::content( - static_error(path, anyhow!(trace), Some(operation), fallback_page).await?, - 500, - HeaderListVc::empty(), - ) - } - RenderStaticIncomingMessage::Headers { data } => { + RenderItem::Response(response) => response, + RenderItem::Headers(data) => { + let body = stream.map(|item| match item { + Ok(RenderItem::BodyChunk(b)) => Ok(b), + Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))), + Err(e) => Err(BodyError::new(format!( + "error streaming proxied contents: {}", + e + ))), + }); StaticResult::StreamedContent { status: data.status, headers: HeaderListVc::cell(data.headers), - body: stream_body_chunks( - operation, - intermediate_asset, - intermediate_output_path, - project_dir, - ), + body: Body::from_stream(body), } + .cell() } - .cell(), + v => bail!("unexpected render item: {:#?}", v), }) } @@ -196,3 +181,176 @@ async fn static_error( Ok(html.content()) } + +#[derive(Clone, Debug)] +#[turbo_tasks::value] +enum RenderItem { + Response(StaticResultVc), + Headers(ResponseHeaders), + BodyChunk(Bytes), +} + +type RenderItemResult = Result; + +#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")] +pub struct RenderStreamSender { + #[turbo_tasks(trace_ignore, debug_ignore)] + get: Box UnboundedSender + Send + Sync>, +} + +#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")] +struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); + +#[turbo_tasks::function] +fn render_stream( + pool: NodeJsPoolVc, + data: RenderDataVc, + intermediate_asset: AssetVc, + intermediate_output_path: FileSystemPathVc, + project_dir: FileSystemPathVc, + error_path: FileSystemPathVc, + fallback_page: DevHtmlAssetVc, +) -> RenderStreamVc { + // Note the following code uses some hacks to create a child task that produces + // a stream that is returned by this task. + + // We create a new cell in this task, which will be updated from the + // [render_stream_internal] task. + let cell = turbo_tasks::macro_helpers::find_cell_by_type(*RENDERSTREAM_VALUE_TYPE_ID); + + // We initialize the cell with a stream that is open, but has no values. + // The first [render_stream_internal] pipe call will pick up that stream. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + let initial = Mutex::new(Some(sender)); + + // run the evaluation as side effect + render_stream_internal( + pool, + data, + intermediate_asset, + intermediate_output_path, + project_dir, + error_path, + fallback_page, + RenderStreamSender { + get: Box::new(move || { + if let Some(sender) = initial.lock().take() { + sender + } else { + // In cases when only [render_stream_internal] is (re)executed, we need to + // update the old stream with a new value. + let (sender, receiver) = unbounded(); + cell.update_shared(RenderStream(Stream::new_open(vec![], Box::new(receiver)))); + sender + } + }), + } + .cell(), + ); + + let raw: RawVc = cell.into(); + raw.into() +} + +#[turbo_tasks::function] +async fn render_stream_internal( + pool: NodeJsPoolVc, + data: RenderDataVc, + intermediate_asset: AssetVc, + intermediate_output_path: FileSystemPathVc, + project_dir: FileSystemPathVc, + error_path: FileSystemPathVc, + fallback_page: DevHtmlAssetVc, + sender: RenderStreamSenderVc, +) { + mark_finished(); + let Ok(sender) = sender.await else { + // Impossible to handle the error in a good way. + return; + }; + + let stream = generator! { + let data = data.await?; + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = pool.strongly_consistent().await?; + let mut operation = pool.operation().await?; + + operation + .send(RenderStaticOutgoingMessage::Headers { data: &data }) + .await + .context("sending headers to node.js process")?; + + match operation.recv().await? { + RenderStaticIncomingMessage::Headers { data } => yield RenderItem::Headers(data), + RenderStaticIncomingMessage::Rewrite { path } => { + yield RenderItem::Response(StaticResultVc::rewrite(RewriteBuilder::new(path).build())); + return; + } + RenderStaticIncomingMessage::Response { + status_code, + headers, + body, + } => { + yield RenderItem::Response(StaticResultVc::content( + FileContent::Content(File::from(body)).into(), + status_code, + HeaderListVc::cell(headers), + )); + return; + } + RenderStaticIncomingMessage::Error(error) => { + // If we don't get headers, then something is very wrong. Instead, we send down a + // 500 proxy error as if it were the proper result. + let trace = trace_stack( + error, + intermediate_asset, + intermediate_output_path, + project_dir, + ) + .await?; + yield RenderItem::Response( + StaticResultVc::content( + static_error(error_path, anyhow!(trace), Some(operation), fallback_page).await?, + 500, + HeaderListVc::empty(), + ) + ); + return; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + }; + + // If we get here, then the first message was a Headers. Now we need to stream out the body + // chunks. + loop { + match operation.recv().await? { + RenderStaticIncomingMessage::BodyChunk { data } => { + yield RenderItem::BodyChunk(data.into()); + } + RenderStaticIncomingMessage::BodyEnd => break, + RenderStaticIncomingMessage::Error(error) => { + // We have already started to send a result, so we can't change the + // headers/body to a proxy error. + operation.disallow_reuse(); + let trace = + trace_stack(error, intermediate_asset, intermediate_output_path, project_dir).await?; + Err(anyhow!("error during streaming render: {}", trace))?; + } + v => Err(anyhow!("unexpected message during rendering: {:#?}", v))?, + } + } + }; + + let mut sender = (sender.get)(); + pin_mut!(stream); + while let Some(value) = stream.next().await { + if sender.send(value).await.is_err() { + return; + } + if sender.flush().await.is_err() { + return; + } + } +} From 7fe1eca5095f08fdf1ff94c7b6e0880ea539ea5d Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Wed, 29 Mar 2023 12:53:33 -0400 Subject: [PATCH 6/7] Fix strongly_consistent usage for pools --- crates/turbopack-node/src/evaluate.rs | 42 ++++++--- .../turbopack-node/src/render/render_proxy.rs | 91 ++++++++++-------- .../src/render/render_static.rs | 93 +++++++++++-------- 3 files changed, 133 insertions(+), 93 deletions(-) diff --git a/crates/turbopack-node/src/evaluate.rs b/crates/turbopack-node/src/evaluate.rs index d249e28e22429..f2773267cf4c0 100644 --- a/crates/turbopack-node/src/evaluate.rs +++ b/crates/turbopack-node/src/evaluate.rs @@ -249,17 +249,6 @@ pub fn evaluate( additional_invalidation: CompletionVc, debug: bool, ) -> JavaScriptEvaluationVc { - let pool = get_evaluate_pool( - module_asset, - cwd, - env, - context, - chunking_context, - runtime_entries, - additional_invalidation, - debug, - ); - // Note the following code uses some hacks to create a child task that produces // a stream that is returned by this task. @@ -278,11 +267,16 @@ pub fn evaluate( // run the evaluation as side effect compute_evaluate_stream( - pool, + module_asset, cwd, + env, context_ident_for_issue, + context, chunking_context, + runtime_entries, args, + additional_invalidation, + debug, JavaScriptStreamSender { get: Box::new(move || { if let Some(sender) = initial.lock().take() { @@ -308,11 +302,16 @@ pub fn evaluate( #[turbo_tasks::function] async fn compute_evaluate_stream( - pool: NodeJsPoolVc, + module_asset: AssetVc, cwd: FileSystemPathVc, + env: ProcessEnvVc, context_ident_for_issue: AssetIdentVc, + context: AssetContextVc, chunking_context: ChunkingContextVc, + runtime_entries: Option, args: Vec, + additional_invalidation: CompletionVc, + debug: bool, sender: JavaScriptStreamSenderVc, ) { mark_finished(); @@ -322,7 +321,20 @@ async fn compute_evaluate_stream( }; let stream = generator! { - let pool = pool.await?; + let pool = get_evaluate_pool( + module_asset, + cwd, + env, + context, + chunking_context, + runtime_entries, + additional_invalidation, + debug, + ); + + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = pool.strongly_consistent().await?; let args = args.into_iter().try_join().await?; // Assume this is a one-off operation, so we can kill the process @@ -407,7 +419,7 @@ async fn pull_operation( match operation.recv().await? { EvalJavaScriptIncomingMessage::Error(error) => { EvaluationIssue { - error: error.clone(), + error, context_ident: context_ident_for_issue, assets_for_source_mapping: pool.assets_for_source_mapping, assets_root: pool.assets_root, diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index d84cd613e84c3..edb947f79807f 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -9,7 +9,7 @@ use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc} use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::FileSystemPathVc; -use turbopack_core::{asset::AssetVc, chunk::ChunkingContextVc, error::PrettyPrintError}; +use turbopack_core::{chunk::ChunkingContextVc, error::PrettyPrintError}; use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc}; use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc}; @@ -18,10 +18,8 @@ use super::{ ResponseHeaders, }; use crate::{ - get_intermediate_asset, get_renderer_pool, - pool::{NodeJsOperation, NodeJsPoolVc}, - render::error_page::error_html, - source_map::trace_stack, + get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, + render::error_page::error_html, source_map::trace_stack, }; /// Renders a module as static HTML in a node.js process. @@ -39,29 +37,18 @@ pub async fn render_proxy( data: RenderDataVc, body: BodyVc, ) -> Result { - let intermediate_asset = get_intermediate_asset( - module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), - intermediate_output_path, - ); - - let pool = get_renderer_pool( + let render = render_stream( cwd, env, - intermediate_asset, + path, + module, + runtime_entries, + chunking_context, intermediate_output_path, output_root, project_dir, - /* debug */ false, - ); - - let render = render_stream( - pool, data, body, - intermediate_asset, - intermediate_output_path, - project_dir, - path, ) .await?; @@ -154,13 +141,17 @@ struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); #[turbo_tasks::function] fn render_stream( - pool: NodeJsPoolVc, - data: RenderDataVc, - body: BodyVc, - intermediate_asset: AssetVc, + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + chunking_context: ChunkingContextVc, intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, project_dir: FileSystemPathVc, - error_path: FileSystemPathVc, + data: RenderDataVc, + body: BodyVc, ) -> RenderStreamVc { // Note the following code uses some hacks to create a child task that produces // a stream that is returned by this task. @@ -177,13 +168,17 @@ fn render_stream( // run the evaluation as side effect render_stream_internal( - pool, - data, - body, - intermediate_asset, + cwd, + env, + path, + module, + runtime_entries, + chunking_context, intermediate_output_path, + output_root, project_dir, - error_path, + data, + body, RenderStreamSender { get: Box::new(move || { if let Some(sender) = initial.lock().take() { @@ -206,13 +201,17 @@ fn render_stream( #[turbo_tasks::function] async fn render_stream_internal( - pool: NodeJsPoolVc, - data: RenderDataVc, - body: BodyVc, - intermediate_asset: AssetVc, + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + chunking_context: ChunkingContextVc, intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, project_dir: FileSystemPathVc, - error_path: FileSystemPathVc, + data: RenderDataVc, + body: BodyVc, sender: RenderStreamSenderVc, ) { mark_finished(); @@ -222,8 +221,24 @@ async fn render_stream_internal( }; let stream = generator! { + let intermediate_asset = get_intermediate_asset( + module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), + intermediate_output_path, + ); + let pool = get_renderer_pool( + cwd, + env, + intermediate_asset, + intermediate_output_path, + output_root, + project_dir, + /* debug */ false, + ); + + // Read this strongly consistent, since we don't want to run inconsistent + // node.js code. + let pool = pool.strongly_consistent().await?; let data = data.await?; - let pool = pool.await?; let mut operation = pool.operation().await?; // First, send the render data. @@ -251,7 +266,7 @@ async fn render_stream_internal( project_dir ) .await?; - let (status, body) = proxy_error(error_path, anyhow!("error rendering: {}", trace), Some(operation)).await?; + let (status, body) = proxy_error(path, anyhow!("error rendering: {}", trace), Some(operation)).await?; yield RenderItem::Headers(ResponseHeaders { status, headers: vec![( diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index 28dfc066e8863..8b3a7f1707e6f 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -10,7 +10,7 @@ use turbo_tasks_bytes::{Bytes, Stream}; use turbo_tasks_env::ProcessEnvVc; use turbo_tasks_fs::{File, FileContent, FileSystemPathVc}; use turbopack_core::{ - asset::{Asset, AssetContentVc, AssetVc}, + asset::{Asset, AssetContentVc}, chunk::ChunkingContextVc, error::PrettyPrintError, }; @@ -24,11 +24,8 @@ use super::{ issue::RenderingIssue, RenderDataVc, RenderStaticIncomingMessage, RenderStaticOutgoingMessage, }; use crate::{ - get_intermediate_asset, get_renderer_pool, - pool::{NodeJsOperation, NodeJsPoolVc}, - render::error_page::error_html_body, - source_map::trace_stack, - ResponseHeaders, + get_intermediate_asset, get_renderer_pool, pool::NodeJsOperation, + render::error_page::error_html_body, source_map::trace_stack, ResponseHeaders, }; #[derive(Clone, Debug)] @@ -80,28 +77,18 @@ pub async fn render_static( project_dir: FileSystemPathVc, data: RenderDataVc, ) -> Result { - let intermediate_asset = get_intermediate_asset( - module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), - intermediate_output_path, - ); - let renderer_pool = get_renderer_pool( + let render = render_stream( cwd, env, - intermediate_asset, + path, + module, + runtime_entries, + fallback_page, + chunking_context, intermediate_output_path, output_root, project_dir, - /* debug */ false, - ); - - let render = render_stream( - renderer_pool, data, - intermediate_asset, - intermediate_output_path, - project_dir, - path, - fallback_page, ) .await?; @@ -203,13 +190,17 @@ struct RenderStream(#[turbo_tasks(trace_ignore)] Stream); #[turbo_tasks::function] fn render_stream( - pool: NodeJsPoolVc, - data: RenderDataVc, - intermediate_asset: AssetVc, + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + fallback_page: DevHtmlAssetVc, + chunking_context: ChunkingContextVc, intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, project_dir: FileSystemPathVc, - error_path: FileSystemPathVc, - fallback_page: DevHtmlAssetVc, + data: RenderDataVc, ) -> RenderStreamVc { // Note the following code uses some hacks to create a child task that produces // a stream that is returned by this task. @@ -226,13 +217,17 @@ fn render_stream( // run the evaluation as side effect render_stream_internal( - pool, - data, - intermediate_asset, + cwd, + env, + path, + module, + runtime_entries, + fallback_page, + chunking_context, intermediate_output_path, + output_root, project_dir, - error_path, - fallback_page, + data, RenderStreamSender { get: Box::new(move || { if let Some(sender) = initial.lock().take() { @@ -255,13 +250,17 @@ fn render_stream( #[turbo_tasks::function] async fn render_stream_internal( - pool: NodeJsPoolVc, - data: RenderDataVc, - intermediate_asset: AssetVc, + cwd: FileSystemPathVc, + env: ProcessEnvVc, + path: FileSystemPathVc, + module: EcmascriptModuleAssetVc, + runtime_entries: EcmascriptChunkPlaceablesVc, + fallback_page: DevHtmlAssetVc, + chunking_context: ChunkingContextVc, intermediate_output_path: FileSystemPathVc, + output_root: FileSystemPathVc, project_dir: FileSystemPathVc, - error_path: FileSystemPathVc, - fallback_page: DevHtmlAssetVc, + data: RenderDataVc, sender: RenderStreamSenderVc, ) { mark_finished(); @@ -271,10 +270,24 @@ async fn render_stream_internal( }; let stream = generator! { - let data = data.await?; + let intermediate_asset = get_intermediate_asset( + module.as_evaluated_chunk(chunking_context, Some(runtime_entries)), + intermediate_output_path, + ); + let renderer_pool = get_renderer_pool( + cwd, + env, + intermediate_asset, + intermediate_output_path, + output_root, + project_dir, + /* debug */ false, + ); + // Read this strongly consistent, since we don't want to run inconsistent // node.js code. - let pool = pool.strongly_consistent().await?; + let pool = renderer_pool.strongly_consistent().await?; + let data = data.await?; let mut operation = pool.operation().await?; operation @@ -312,7 +325,7 @@ async fn render_stream_internal( .await?; yield RenderItem::Response( StaticResultVc::content( - static_error(error_path, anyhow!(trace), Some(operation), fallback_page).await?, + static_error(path, anyhow!(trace), Some(operation), fallback_page).await?, 500, HeaderListVc::empty(), ) From 84d95f1ea7524718ac7a3caed3a9d6e41263ba54 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Wed, 29 Mar 2023 12:53:44 -0400 Subject: [PATCH 7/7] Use try_next unwrapping --- crates/turbopack-node/src/render/render_proxy.rs | 8 ++++---- crates/turbopack-node/src/render/render_static.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/turbopack-node/src/render/render_proxy.rs b/crates/turbopack-node/src/render/render_proxy.rs index edb947f79807f..d1ea9ba7901d4 100644 --- a/crates/turbopack-node/src/render/render_proxy.rs +++ b/crates/turbopack-node/src/render/render_proxy.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail, Result}; use async_stream::try_stream as generator; use futures::{ channel::mpsc::{unbounded, UnboundedSender}, - pin_mut, SinkExt, StreamExt, + pin_mut, SinkExt, StreamExt, TryStreamExt, }; use parking_lot::Mutex; use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; @@ -53,9 +53,9 @@ pub async fn render_proxy( .await?; let mut stream = render.read(); - let first = match stream.next().await { - Some(Ok(f)) => f, - _ => { + let first = match stream.try_next().await? { + Some(f) => f, + None => { // If an Error was received first, then it would have been // transformed into a proxy err error response. bail!("did not receive response from render"); diff --git a/crates/turbopack-node/src/render/render_static.rs b/crates/turbopack-node/src/render/render_static.rs index 8b3a7f1707e6f..6fc7150dc3345 100644 --- a/crates/turbopack-node/src/render/render_static.rs +++ b/crates/turbopack-node/src/render/render_static.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail, Context, Result}; use async_stream::try_stream as generator; use futures::{ channel::mpsc::{unbounded, UnboundedSender}, - pin_mut, SinkExt, StreamExt, + pin_mut, SinkExt, StreamExt, TryStreamExt, }; use parking_lot::Mutex; use turbo_tasks::{mark_finished, primitives::StringVc, util::SharedError, RawVc}; @@ -93,9 +93,9 @@ pub async fn render_static( .await?; let mut stream = render.read(); - let first = match stream.next().await { - Some(Ok(f)) => f, - _ => { + let first = match stream.try_next().await? { + Some(f) => f, + None => { // If an Error was received first, then it would have been // transformed into a proxy err error response. bail!("did not receive response from render");