这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use turbo_tasks::{
};

use crate::{
aggregation_tree::{aggregation_info, ensure_thresholds},
aggregation_tree::{aggregation_info, ensure_thresholds, AggregationInfoGuard},
cell::Cell,
gc::{to_exp_u8, GcPriority, GcStats, GcTaskState},
output::{Output, OutputContent},
Expand Down Expand Up @@ -417,7 +417,7 @@ enum TaskStateType {
use TaskStateType::*;

use self::{
aggregation::{RootInfoType, RootType, TaskAggregationTreeLeaf, TaskGuard},
aggregation::{Aggregated, RootInfoType, RootType, TaskAggregationTreeLeaf, TaskGuard},
meta_state::{
FullTaskWriteGuard, TaskMetaState, TaskMetaStateReadGuard, TaskMetaStateWriteGuard,
},
Expand Down Expand Up @@ -490,7 +490,11 @@ impl Task {
) {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
{
aggregation_context.aggregation_info(id).lock().root_type = Some(RootType::Root);
Self::set_root_type(
&aggregation_context,
&mut aggregation_context.aggregation_info(id).lock(),
RootType::Root,
);
}
aggregation_context.apply_queued_updates();
}
Expand All @@ -502,11 +506,32 @@ impl Task {
) {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
{
aggregation_context.aggregation_info(id).lock().root_type = Some(RootType::Once);
let aggregation_info = &aggregation_context.aggregation_info(id);
Self::set_root_type(
&aggregation_context,
&mut aggregation_info.lock(),
RootType::Once,
);
}
aggregation_context.apply_queued_updates();
}

fn set_root_type(
aggregation_context: &TaskAggregationContext,
aggregation: &mut AggregationInfoGuard<Aggregated>,
root_type: RootType,
) {
aggregation.root_type = Some(root_type);
let dirty_tasks = aggregation
.dirty_tasks
.iter()
.filter_map(|(&id, &count)| (count > 0).then_some(id));
let mut tasks_to_schedule = aggregation_context.dirty_tasks_to_schedule.lock();
tasks_to_schedule
.get_or_insert_default()
.extend(dirty_tasks);
}

pub(crate) fn unset_root(
id: TaskId,
backend: &MemoryBackend,
Expand Down Expand Up @@ -601,7 +626,7 @@ impl Task {
});
}
TaskDependency::Collectibles(task, trait_type) => {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let aggregation = aggregation_context.aggregation_info(task);
aggregation
.lock()
Expand Down Expand Up @@ -1518,14 +1543,26 @@ impl Task {
let mut state = self.full_state_mut();
if let Some(aggregation) = aggregation_when_strongly_consistent {
{
let aggregation = aggregation.lock();
let mut aggregation = aggregation.lock();
if aggregation.unfinished > 0 {
if aggregation.root_type.is_none() {
Self::set_root_type(
&aggregation_context,
&mut aggregation,
RootType::ReadingStronglyConsistent,
);
}
let listener = aggregation.unfinished_event.listen_with_note(note);
drop(aggregation);
drop(state);
aggregation_context.apply_queued_updates();

return Ok(Err(listener));
} else if matches!(
aggregation.root_type,
Some(RootType::ReadingStronglyConsistent)
) {
aggregation.root_type = None;
}
}
}
Expand Down Expand Up @@ -1576,7 +1613,7 @@ impl Task {
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> AutoMap<RawVc, i32> {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
aggregation_context
.aggregation_info(id)
.lock()
Expand Down
16 changes: 8 additions & 8 deletions crates/turbo-tasks-memory/src/task/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
pub enum RootType {
Once,
Root,
ReadingStronglyConsistent,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -59,8 +60,9 @@ pub struct Aggregated {

/// Only used for the aggregation root. Which kind of root is this?
/// [RootType::Once] for OnceTasks or [RootType::Root] for Root Tasks.
/// It's set to None for other tasks, when the once task is done or when the
/// root task is disposed.
/// [RootType::ReadingStronglyConsistent] while currently reading a task
/// strongly consistent. It's set to None for other tasks, when the once
/// task is done or when the root task is disposed.
pub root_type: Option<RootType>,
}

Expand Down Expand Up @@ -189,7 +191,7 @@ impl<'a> TaskAggregationContext<'a> {
}
}

pub fn aggregation_info(&mut self, id: TaskId) -> AggregationInfoReference<Aggregated> {
pub fn aggregation_info(&self, id: TaskId) -> AggregationInfoReference<Aggregated> {
aggregation_info(self, &id)
}
}
Expand Down Expand Up @@ -236,7 +238,7 @@ impl<'a> AggregationContext for TaskAggregationContext<'a> {
let mut unfinished = 0;
if info.unfinished > 0 {
info.unfinished += change.unfinished;
if info.unfinished == 0 {
if info.unfinished <= 0 {
info.unfinished_event.notify(usize::MAX);
unfinished = -1;
}
Expand All @@ -250,12 +252,10 @@ impl<'a> AggregationContext for TaskAggregationContext<'a> {
for &(task, count) in change.unfinished_tasks_update.iter() {
update_count_entry(info.unfinished_tasks.entry(task), count);
}
let is_root = info.root_type.is_some();
for &(task, count) in change.dirty_tasks_update.iter() {
let value = update_count_entry(info.dirty_tasks.entry(task), count);
if value > 0
&& value <= count
&& matches!(info.root_type, Some(RootType::Root) | Some(RootType::Once))
{
if is_root && value > 0 && value <= count {
let mut tasks_to_schedule = self.dirty_tasks_to_schedule.lock();
tasks_to_schedule.get_or_insert_default().insert(task);
}
Expand Down