diff --git a/crates/turbopack-core/src/chunk/list/update.rs b/crates/turbopack-core/src/chunk/list/update.rs index 921ad265971a7..310d97d3f9c80 100644 --- a/crates/turbopack-core/src/chunk/list/update.rs +++ b/crates/turbopack-core/src/chunk/list/update.rs @@ -1,10 +1,9 @@ +use std::sync::Arc; + use anyhow::Result; use indexmap::IndexMap; use serde::Serialize; -use turbo_tasks::{ - primitives::{JsonValueReadRef, JsonValueVc}, - TraitRef, -}; +use turbo_tasks::{IntoTraitRef, TraitRef}; use super::{content::ChunkListContentVc, version::ChunkListVersionVc}; use crate::version::{ @@ -22,7 +21,7 @@ struct ChunkListUpdate<'a> { chunks: IndexMap<&'a str, ChunkUpdate>, /// List of merged updates since the last version. #[serde(skip_serializing_if = "Vec::is_empty")] - merged: Vec, + merged: Vec>, } /// Update of a chunk from one version to another. @@ -33,7 +32,7 @@ enum ChunkUpdate { /// The chunk was updated and must be reloaded. Total, /// The chunk was updated and can be merged with the previous version. - Partial { instruction: JsonValueReadRef }, + Partial { instruction: Arc }, /// The chunk was added. Added, /// The chunk was deleted. @@ -60,7 +59,7 @@ pub(super) async fn update_chunk_list( } else { // It's likely `from_version` is `NotFoundVersion`. return Ok(Update::Total(TotalUpdate { - to: to_version.into(), + to: to_version.as_version().into_trait_ref().await?, }) .cell()); }; @@ -112,7 +111,7 @@ pub(super) async fn update_chunk_list( chunks.insert( chunk_path.as_ref(), ChunkUpdate::Partial { - instruction: partial.instruction.await?, + instruction: partial.instruction.clone(), }, ); } @@ -141,12 +140,12 @@ pub(super) async fn update_chunk_list( // the update. Update::Total(_) => { return Ok(Update::Total(TotalUpdate { - to: to_version.into(), + to: to_version.as_version().into_trait_ref().await?, }) .cell()); } Update::Partial(partial) => { - merged.push(partial.instruction.await?); + merged.push(partial.instruction.clone()); } Update::None => {} } @@ -158,8 +157,8 @@ pub(super) async fn update_chunk_list( Update::None } else { Update::Partial(PartialUpdate { - to: to_version.into(), - instruction: JsonValueVc::cell(serde_json::to_value(&update)?), + to: to_version.as_version().into_trait_ref().await?, + instruction: Arc::new(serde_json::to_value(&update)?), }) }; diff --git a/crates/turbopack-core/src/version.rs b/crates/turbopack-core/src/version.rs index e421540425ed2..bd9d2d63c1af7 100644 --- a/crates/turbopack-core/src/version.rs +++ b/crates/turbopack-core/src/version.rs @@ -1,10 +1,9 @@ +use std::sync::Arc; + use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use turbo_tasks::{ - debug::ValueDebugFormat, - primitives::{JsonValueVc, StringVc}, - trace::TraceRawVcs, - IntoTraitRef, + debug::ValueDebugFormat, primitives::StringVc, trace::TraceRawVcs, IntoTraitRef, TraitRef, }; use turbo_tasks_fs::{FileContent, FileContentReadRef, LinkType}; use turbo_tasks_hash::{encode_hex, hash_xxh3_hash64}; @@ -46,7 +45,7 @@ pub trait VersionedContent { Ok(if *from_id == *to_id { Update::None.into() } else { - Update::Total(TotalUpdate { to }).into() + Update::Total(TotalUpdate { to: to_ref }).into() }) } } @@ -186,17 +185,20 @@ pub enum Update { #[derive(PartialEq, Eq, Debug, Clone, TraceRawVcs, ValueDebugFormat, Serialize, Deserialize)] pub struct TotalUpdate { /// The version this update will bring the object to. - pub to: VersionVc, + #[turbo_tasks(trace_ignore)] + pub to: TraitRef, } /// A partial update to a versioned object. #[derive(PartialEq, Eq, Debug, Clone, TraceRawVcs, ValueDebugFormat, Serialize, Deserialize)] pub struct PartialUpdate { /// The version this update will bring the object to. - pub to: VersionVc, + #[turbo_tasks(trace_ignore)] + pub to: TraitRef, /// The instructions to be passed to a remote system in order to update the /// versioned object. - pub instruction: JsonValueVc, + #[turbo_tasks(trace_ignore)] + pub instruction: Arc, } /// [`Version`] implementation that hashes a file at a given path and returns diff --git a/crates/turbopack-dev-server/src/source/mod.rs b/crates/turbopack-dev-server/src/source/mod.rs index 809b94c764363..1e090be366330 100644 --- a/crates/turbopack-dev-server/src/source/mod.rs +++ b/crates/turbopack-dev-server/src/source/mod.rs @@ -16,12 +16,13 @@ pub mod wrapping_source; use std::collections::BTreeSet; use anyhow::Result; -use futures::stream::Stream as StreamTrait; +use futures::{stream::Stream as StreamTrait, TryStreamExt}; use serde::{Deserialize, Serialize}; -use turbo_tasks::{trace::TraceRawVcs, util::SharedError, Value}; +use turbo_tasks::{primitives::StringVc, trace::TraceRawVcs, util::SharedError, Value}; use turbo_tasks_bytes::{Bytes, Stream, StreamRead}; use turbo_tasks_fs::FileSystemPathVc; -use turbopack_core::version::VersionedContentVc; +use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, Xxh3Hash64Hasher}; +use turbopack_core::version::{Version, VersionVc, VersionedContentVc}; use self::{ headers::Headers, issue_context::IssueContextContentSourceVc, query::Query, @@ -39,6 +40,24 @@ pub struct ProxyResult { pub body: Body, } +#[turbo_tasks::value_impl] +impl Version for ProxyResult { + #[turbo_tasks::function] + async fn id(&self) -> Result { + let mut hash = Xxh3Hash64Hasher::new(); + hash.write_u16(self.status); + for (name, value) in &self.headers { + name.deterministic_hash(&mut hash); + value.deterministic_hash(&mut hash); + } + let mut read = self.body.read(); + while let Some(chunk) = read.try_next().await? { + hash.write_bytes(&chunk); + } + Ok(StringVc::cell(hash.finish().to_string())) + } +} + /// The return value of a content source when getting a path. A specificity is /// attached and when combining results this specificity should be used to order /// results. diff --git a/crates/turbopack-dev-server/src/update/protocol.rs b/crates/turbopack-dev-server/src/update/protocol.rs index 880fb4d6ee9d3..ebdb95cbdb1f7 100644 --- a/crates/turbopack-dev-server/src/update/protocol.rs +++ b/crates/turbopack-dev-server/src/update/protocol.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, ops::Deref, path::PathBuf}; +use std::{collections::BTreeMap, fmt::Display, ops::Deref, path::PathBuf}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -15,6 +15,18 @@ pub struct ResourceIdentifier { pub headers: Option>, } +impl Display for ResourceIdentifier { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.path)?; + if let Some(headers) = &self.headers { + for (key, value) in headers.iter() { + write!(f, " [{}: {}]", key, value)?; + } + } + Ok(()) + } +} + #[derive(Deserialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum ClientMessage { diff --git a/crates/turbopack-dev-server/src/update/server.rs b/crates/turbopack-dev-server/src/update/server.rs index e5da3430de5c1..1055466c4c6a3 100644 --- a/crates/turbopack-dev-server/src/update/server.rs +++ b/crates/turbopack-dev-server/src/update/server.rs @@ -12,7 +12,7 @@ use tokio::select; use tokio_stream::StreamMap; use turbo_tasks::{TransientInstance, TurboTasksApi}; use turbo_tasks_fs::json::parse_json_with_source_context; -use turbopack_core::{issue::IssueReporterVc, version::Update}; +use turbopack_core::{error::PrettyPrintError, issue::IssueReporterVc, version::Update}; use super::{ protocol::{ClientMessage, ClientUpdateInstruction, Issue, ResourceIdentifier}, @@ -72,8 +72,17 @@ impl UpdateServer

{ ) } }; - let stream = UpdateStream::new(TransientInstance::new(Box::new(get_content))).await?; - streams.insert(resource, stream); + match UpdateStream::new(TransientInstance::new(Box::new(get_content))).await { + Ok(stream) => { + streams.insert(resource, stream); + } + Err(err) => { + eprintln!("Failed to create update stream for {resource}: {}", PrettyPrintError(&err)); + client + .send(ClientUpdateInstruction::not_found(&resource)) + .await?; + } + } } Some(ClientMessage::Unsubscribe { resource }) => { streams.remove(&resource); @@ -116,11 +125,11 @@ impl UpdateServer

{ .collect::>>(); match &**update { Update::Partial(partial) => { - let partial_instruction = partial.instruction.await?; + let partial_instruction = &partial.instruction; client .send(ClientUpdateInstruction::partial( &resource, - &partial_instruction, + &**partial_instruction, &issues, )) .await?; diff --git a/crates/turbopack-dev-server/src/update/stream.rs b/crates/turbopack-dev-server/src/update/stream.rs index 977348bc9a5f4..fdfd7bbf41e10 100644 --- a/crates/turbopack-dev-server/src/update/stream.rs +++ b/crates/turbopack-dev-server/src/update/stream.rs @@ -13,7 +13,10 @@ use turbopack_core::{ }, }; -use crate::source::resolve::{ResolveSourceRequestResult, ResolveSourceRequestResultVc}; +use crate::source::{ + resolve::{ResolveSourceRequestResult, ResolveSourceRequestResultVc}, + ProxyResultVc, +}; type GetContentFn = Box ResolveSourceRequestResultVc + Send + Sync>; @@ -40,7 +43,7 @@ async fn get_update_stream_item( ) -> Result { let content = get_content(); - match &*content.await? { + match *content.await? { ResolveSourceRequestResult::Static(static_content_vc, _) => { let static_content = static_content_vc.await?; @@ -64,6 +67,36 @@ async fn get_update_stream_item( } .cell()) } + ResolveSourceRequestResult::HttpProxy(proxy_result) => { + let proxy_result_value = proxy_result.await?; + + if proxy_result_value.status == 404 { + return Ok(UpdateStreamItem::NotFound.cell()); + } + + let plain_issues = peek_issues(proxy_result).await?; + + let from = from.get(); + if let Some(from) = ProxyResultVc::resolve_from(from).await? { + if from.await? == proxy_result_value { + return Ok(UpdateStreamItem::Found { + update: Update::None.cell().await?, + issues: plain_issues, + } + .cell()); + } + } + + Ok(UpdateStreamItem::Found { + update: Update::Total(TotalUpdate { + to: proxy_result.as_version().into_trait_ref().await?, + }) + .cell() + .await?, + issues: plain_issues, + } + .cell()) + } _ => { let plain_issues = peek_issues(content).await?; @@ -73,7 +106,10 @@ async fn get_update_stream_item( // TODO add special instructions for removed assets to handled it in a better // way Update::Total(TotalUpdate { - to: NotFoundVersionVc::new().into(), + to: NotFoundVersionVc::new() + .as_version() + .into_trait_ref() + .await?, }) .cell() } else { @@ -123,15 +159,15 @@ impl VersionStateVc { } impl VersionStateVc { - async fn new(version: VersionVc) -> Result { + async fn new(version: TraitRef) -> Result { Ok(Self::cell(VersionState { - version: State::new(version.into_trait_ref().await?), + version: State::new(version), })) } - async fn set(&self, new_version: VersionVc) -> Result<()> { + async fn set(&self, new_version: TraitRef) -> Result<()> { let this = self.await?; - this.version.set(new_version.into_trait_ref().await?); + this.version.set(new_version); Ok(()) } } @@ -145,13 +181,14 @@ impl UpdateStream { let content = get_content(); // We can ignore issues reported in content here since [compute_update_stream] // will handle them - let version = match &*content.await? { + let version = match *content.await? { ResolveSourceRequestResult::Static(static_content, _) => { static_content.await?.content.version() } + ResolveSourceRequestResult::HttpProxy(proxy_result) => proxy_result.into(), _ => NotFoundVersionVc::new().into(), }; - let version_state = VersionStateVc::new(version).await?; + let version_state = VersionStateVc::new(version.into_trait_ref().await?).await?; compute_update_stream(version_state, get_content, TransientInstance::new(sx)); @@ -179,7 +216,7 @@ impl UpdateStream { Update::Partial(PartialUpdate { to, .. }) | Update::Total(TotalUpdate { to }) => { version_state - .set(*to) + .set(to.clone()) .await .expect("failed to update version"); diff --git a/crates/turbopack-dev/src/ecmascript/merged/update.rs b/crates/turbopack-dev/src/ecmascript/merged/update.rs index b49aa525d9ab7..d350e933c0b50 100644 --- a/crates/turbopack-dev/src/ecmascript/merged/update.rs +++ b/crates/turbopack-dev/src/ecmascript/merged/update.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; + use anyhow::Result; use indexmap::{IndexMap, IndexSet}; use serde::Serialize; -use turbo_tasks::{primitives::JsonValueVc, TryJoinIterExt}; +use turbo_tasks::{IntoTraitRef, TryJoinIterExt}; use turbo_tasks_fs::rope::Rope; use turbopack_core::{ chunk::{Chunk, ChunkingContext, ModuleId, ModuleIdReadRef}, @@ -131,7 +133,7 @@ pub(super) async fn update_ecmascript_merged_chunk( } else { // It's likely `from_version` is `NotFoundVersion`. return Ok(Update::Total(TotalUpdate { - to: to_merged_version.into(), + to: to_merged_version.as_version().into_trait_ref().await?, })); }; @@ -248,8 +250,8 @@ pub(super) async fn update_ecmascript_merged_chunk( Update::None } else { Update::Partial(PartialUpdate { - to: to_merged_version.into(), - instruction: JsonValueVc::cell(serde_json::to_value(&merged_update)?), + to: to_merged_version.as_version().into_trait_ref().await?, + instruction: Arc::new(serde_json::to_value(&merged_update)?), }) };