这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions crates/turbopack-core/src/chunk/list/update.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use anyhow::Result;
use indexmap::IndexMap;
use serde::Serialize;
use turbo_tasks::{
primitives::{JsonValueReadRef, JsonValueVc},
TraitRef,
};
use turbo_tasks::{IntoTraitRef, TraitRef};

use super::{content::ChunkListContentVc, version::ChunkListVersionVc};
use crate::version::{
Expand All @@ -22,7 +21,7 @@ struct ChunkListUpdate<'a> {
chunks: IndexMap<&'a str, ChunkUpdate>,
/// List of merged updates since the last version.
#[serde(skip_serializing_if = "Vec::is_empty")]
merged: Vec<JsonValueReadRef>,
merged: Vec<Arc<serde_json::Value>>,
}

/// Update of a chunk from one version to another.
Expand All @@ -33,7 +32,7 @@ enum ChunkUpdate {
/// The chunk was updated and must be reloaded.
Total,
/// The chunk was updated and can be merged with the previous version.
Partial { instruction: JsonValueReadRef },
Partial { instruction: Arc<serde_json::Value> },
/// The chunk was added.
Added,
/// The chunk was deleted.
Expand All @@ -60,7 +59,7 @@ pub(super) async fn update_chunk_list(
} else {
// It's likely `from_version` is `NotFoundVersion`.
return Ok(Update::Total(TotalUpdate {
to: to_version.into(),
to: to_version.as_version().into_trait_ref().await?,
})
.cell());
};
Expand Down Expand Up @@ -112,7 +111,7 @@ pub(super) async fn update_chunk_list(
chunks.insert(
chunk_path.as_ref(),
ChunkUpdate::Partial {
instruction: partial.instruction.await?,
instruction: partial.instruction.clone(),
},
);
}
Expand Down Expand Up @@ -141,12 +140,12 @@ pub(super) async fn update_chunk_list(
// the update.
Update::Total(_) => {
return Ok(Update::Total(TotalUpdate {
to: to_version.into(),
to: to_version.as_version().into_trait_ref().await?,
})
.cell());
}
Update::Partial(partial) => {
merged.push(partial.instruction.await?);
merged.push(partial.instruction.clone());
}
Update::None => {}
}
Expand All @@ -158,8 +157,8 @@ pub(super) async fn update_chunk_list(
Update::None
} else {
Update::Partial(PartialUpdate {
to: to_version.into(),
instruction: JsonValueVc::cell(serde_json::to_value(&update)?),
to: to_version.as_version().into_trait_ref().await?,
instruction: Arc::new(serde_json::to_value(&update)?),
})
};

Expand Down
18 changes: 10 additions & 8 deletions crates/turbopack-core/src/version.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use turbo_tasks::{
debug::ValueDebugFormat,
primitives::{JsonValueVc, StringVc},
trace::TraceRawVcs,
IntoTraitRef,
debug::ValueDebugFormat, primitives::StringVc, trace::TraceRawVcs, IntoTraitRef, TraitRef,
};
use turbo_tasks_fs::{FileContent, FileContentReadRef, LinkType};
use turbo_tasks_hash::{encode_hex, hash_xxh3_hash64};
Expand Down Expand Up @@ -46,7 +45,7 @@ pub trait VersionedContent {
Ok(if *from_id == *to_id {
Update::None.into()
} else {
Update::Total(TotalUpdate { to }).into()
Update::Total(TotalUpdate { to: to_ref }).into()
})
}
}
Expand Down Expand Up @@ -186,17 +185,20 @@ pub enum Update {
#[derive(PartialEq, Eq, Debug, Clone, TraceRawVcs, ValueDebugFormat, Serialize, Deserialize)]
pub struct TotalUpdate {
/// The version this update will bring the object to.
pub to: VersionVc,
#[turbo_tasks(trace_ignore)]
pub to: TraitRef<VersionVc>,
}

/// A partial update to a versioned object.
#[derive(PartialEq, Eq, Debug, Clone, TraceRawVcs, ValueDebugFormat, Serialize, Deserialize)]
pub struct PartialUpdate {
/// The version this update will bring the object to.
pub to: VersionVc,
#[turbo_tasks(trace_ignore)]
pub to: TraitRef<VersionVc>,
/// The instructions to be passed to a remote system in order to update the
/// versioned object.
pub instruction: JsonValueVc,
#[turbo_tasks(trace_ignore)]
pub instruction: Arc<serde_json::Value>,
}

/// [`Version`] implementation that hashes a file at a given path and returns
Expand Down
25 changes: 22 additions & 3 deletions crates/turbopack-dev-server/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ pub mod wrapping_source;
use std::collections::BTreeSet;

use anyhow::Result;
use futures::stream::Stream as StreamTrait;
use futures::{stream::Stream as StreamTrait, TryStreamExt};
use serde::{Deserialize, Serialize};
use turbo_tasks::{trace::TraceRawVcs, util::SharedError, Value};
use turbo_tasks::{primitives::StringVc, trace::TraceRawVcs, util::SharedError, Value};
use turbo_tasks_bytes::{Bytes, Stream, StreamRead};
use turbo_tasks_fs::FileSystemPathVc;
use turbopack_core::version::VersionedContentVc;
use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, Xxh3Hash64Hasher};
use turbopack_core::version::{Version, VersionVc, VersionedContentVc};

use self::{
headers::Headers, issue_context::IssueContextContentSourceVc, query::Query,
Expand All @@ -39,6 +40,24 @@ pub struct ProxyResult {
pub body: Body,
}

#[turbo_tasks::value_impl]
impl Version for ProxyResult {
#[turbo_tasks::function]
async fn id(&self) -> Result<StringVc> {
let mut hash = Xxh3Hash64Hasher::new();
hash.write_u16(self.status);
for (name, value) in &self.headers {
name.deterministic_hash(&mut hash);
value.deterministic_hash(&mut hash);
}
let mut read = self.body.read();
while let Some(chunk) = read.try_next().await? {
hash.write_bytes(&chunk);
}
Ok(StringVc::cell(hash.finish().to_string()))
}
}

/// The return value of a content source when getting a path. A specificity is
/// attached and when combining results this specificity should be used to order
/// results.
Expand Down
14 changes: 13 additions & 1 deletion crates/turbopack-dev-server/src/update/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, ops::Deref, path::PathBuf};
use std::{collections::BTreeMap, fmt::Display, ops::Deref, path::PathBuf};

use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -15,6 +15,18 @@ pub struct ResourceIdentifier {
pub headers: Option<BTreeMap<String, String>>,
}

impl Display for ResourceIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.path)?;
if let Some(headers) = &self.headers {
for (key, value) in headers.iter() {
write!(f, " [{}: {}]", key, value)?;
}
}
Ok(())
}
}

#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ClientMessage {
Expand Down
19 changes: 14 additions & 5 deletions crates/turbopack-dev-server/src/update/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::select;
use tokio_stream::StreamMap;
use turbo_tasks::{TransientInstance, TurboTasksApi};
use turbo_tasks_fs::json::parse_json_with_source_context;
use turbopack_core::{issue::IssueReporterVc, version::Update};
use turbopack_core::{error::PrettyPrintError, issue::IssueReporterVc, version::Update};

use super::{
protocol::{ClientMessage, ClientUpdateInstruction, Issue, ResourceIdentifier},
Expand Down Expand Up @@ -72,8 +72,17 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
)
}
};
let stream = UpdateStream::new(TransientInstance::new(Box::new(get_content))).await?;
streams.insert(resource, stream);
match UpdateStream::new(TransientInstance::new(Box::new(get_content))).await {
Ok(stream) => {
streams.insert(resource, stream);
}
Err(err) => {
eprintln!("Failed to create update stream for {resource}: {}", PrettyPrintError(&err));
client
.send(ClientUpdateInstruction::not_found(&resource))
.await?;
}
}
}
Some(ClientMessage::Unsubscribe { resource }) => {
streams.remove(&resource);
Expand Down Expand Up @@ -116,11 +125,11 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
.collect::<Vec<Issue<'_>>>();
match &**update {
Update::Partial(partial) => {
let partial_instruction = partial.instruction.await?;
let partial_instruction = &partial.instruction;
client
.send(ClientUpdateInstruction::partial(
&resource,
&partial_instruction,
&**partial_instruction,
&issues,
))
.await?;
Expand Down
57 changes: 47 additions & 10 deletions crates/turbopack-dev-server/src/update/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use turbopack_core::{
},
};

use crate::source::resolve::{ResolveSourceRequestResult, ResolveSourceRequestResultVc};
use crate::source::{
resolve::{ResolveSourceRequestResult, ResolveSourceRequestResultVc},
ProxyResultVc,
};

type GetContentFn = Box<dyn Fn() -> ResolveSourceRequestResultVc + Send + Sync>;

Expand All @@ -40,7 +43,7 @@ async fn get_update_stream_item(
) -> Result<UpdateStreamItemVc> {
let content = get_content();

match &*content.await? {
match *content.await? {
ResolveSourceRequestResult::Static(static_content_vc, _) => {
let static_content = static_content_vc.await?;

Expand All @@ -64,6 +67,36 @@ async fn get_update_stream_item(
}
.cell())
}
ResolveSourceRequestResult::HttpProxy(proxy_result) => {
let proxy_result_value = proxy_result.await?;

if proxy_result_value.status == 404 {
return Ok(UpdateStreamItem::NotFound.cell());
}

let plain_issues = peek_issues(proxy_result).await?;

let from = from.get();
if let Some(from) = ProxyResultVc::resolve_from(from).await? {
if from.await? == proxy_result_value {
return Ok(UpdateStreamItem::Found {
update: Update::None.cell().await?,
issues: plain_issues,
}
.cell());
}
}

Ok(UpdateStreamItem::Found {
update: Update::Total(TotalUpdate {
to: proxy_result.as_version().into_trait_ref().await?,
})
.cell()
.await?,
issues: plain_issues,
}
.cell())
}
_ => {
let plain_issues = peek_issues(content).await?;

Expand All @@ -73,7 +106,10 @@ async fn get_update_stream_item(
// TODO add special instructions for removed assets to handled it in a better
// way
Update::Total(TotalUpdate {
to: NotFoundVersionVc::new().into(),
to: NotFoundVersionVc::new()
.as_version()
.into_trait_ref()
.await?,
})
.cell()
} else {
Expand Down Expand Up @@ -123,15 +159,15 @@ impl VersionStateVc {
}

impl VersionStateVc {
async fn new(version: VersionVc) -> Result<Self> {
async fn new(version: TraitRef<VersionVc>) -> Result<Self> {
Ok(Self::cell(VersionState {
version: State::new(version.into_trait_ref().await?),
version: State::new(version),
}))
}

async fn set(&self, new_version: VersionVc) -> Result<()> {
async fn set(&self, new_version: TraitRef<VersionVc>) -> Result<()> {
let this = self.await?;
this.version.set(new_version.into_trait_ref().await?);
this.version.set(new_version);
Ok(())
}
}
Expand All @@ -145,13 +181,14 @@ impl UpdateStream {
let content = get_content();
// We can ignore issues reported in content here since [compute_update_stream]
// will handle them
let version = match &*content.await? {
let version = match *content.await? {
ResolveSourceRequestResult::Static(static_content, _) => {
static_content.await?.content.version()
}
ResolveSourceRequestResult::HttpProxy(proxy_result) => proxy_result.into(),
_ => NotFoundVersionVc::new().into(),
};
let version_state = VersionStateVc::new(version).await?;
let version_state = VersionStateVc::new(version.into_trait_ref().await?).await?;

compute_update_stream(version_state, get_content, TransientInstance::new(sx));

Expand Down Expand Up @@ -179,7 +216,7 @@ impl UpdateStream {
Update::Partial(PartialUpdate { to, .. })
| Update::Total(TotalUpdate { to }) => {
version_state
.set(*to)
.set(to.clone())
.await
.expect("failed to update version");

Expand Down
10 changes: 6 additions & 4 deletions crates/turbopack-dev/src/ecmascript/merged/update.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::Arc;

use anyhow::Result;
use indexmap::{IndexMap, IndexSet};
use serde::Serialize;
use turbo_tasks::{primitives::JsonValueVc, TryJoinIterExt};
use turbo_tasks::{IntoTraitRef, TryJoinIterExt};
use turbo_tasks_fs::rope::Rope;
use turbopack_core::{
chunk::{Chunk, ChunkingContext, ModuleId, ModuleIdReadRef},
Expand Down Expand Up @@ -131,7 +133,7 @@ pub(super) async fn update_ecmascript_merged_chunk(
} else {
// It's likely `from_version` is `NotFoundVersion`.
return Ok(Update::Total(TotalUpdate {
to: to_merged_version.into(),
to: to_merged_version.as_version().into_trait_ref().await?,
}));
};

Expand Down Expand Up @@ -248,8 +250,8 @@ pub(super) async fn update_ecmascript_merged_chunk(
Update::None
} else {
Update::Partial(PartialUpdate {
to: to_merged_version.into(),
instruction: JsonValueVc::cell(serde_json::to_value(&merged_update)?),
to: to_merged_version.as_version().into_trait_ref().await?,
instruction: Arc::new(serde_json::to_value(&merged_update)?),
})
};

Expand Down