Skip to content

Commit

Permalink
fix collectibles counting (vercel/turborepo#8778)
Browse files Browse the repository at this point in the history
### Description

fix a bunch of cases where collectibles were not correctly counted

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
sokra authored Jul 17, 2024
1 parent 0339244 commit cf49da0
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 49 deletions.
7 changes: 3 additions & 4 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ use std::{
};

use anyhow::{bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use rustc_hash::FxHasher;
use tokio::task::futures::TaskLocalFuture;
use tracing::trace_span;
use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed};
use turbo_tasks::{
backend::{
Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec,
TransientTaskType,
Backend, BackendJobId, CellContent, PersistentTaskType, TaskCollectiblesMap,
TaskExecutionSpec, TransientTaskType,
},
event::EventListener,
util::{IdFactoryWithReuse, NoMoveVec},
Expand Down Expand Up @@ -470,7 +469,7 @@ impl Backend for MemoryBackend {
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> AutoMap<RawVc, i32> {
) -> TaskCollectiblesMap {
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id));
Task::read_collectibles(id, trait_id, reader, self, turbo_tasks)
}
Expand Down
8 changes: 4 additions & 4 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::{
};

use anyhow::{anyhow, Result};
use auto_hash_map::{AutoMap, AutoSet};
use auto_hash_map::AutoSet;
use concurrent_queue::ConcurrentQueue;
use dashmap::{mapref::entry::Entry, DashMap, DashSet};
use turbo_tasks::{
backend::{
Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec,
TransientTaskType,
Backend, BackendJobId, CellContent, PersistentTaskType, TaskCollectiblesMap,
TaskExecutionSpec, TransientTaskType,
},
event::{Event, EventListener},
persisted_graph::{
Expand Down Expand Up @@ -1446,7 +1446,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
_trait_id: TraitTypeId,
_reader: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackendWithPersistedGraph<P>>,
) -> AutoMap<RawVc, i32> {
) -> TaskCollectiblesMap {
todo!()
}

Expand Down
4 changes: 2 additions & 2 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::task_local;
use tracing::Span;
use turbo_prehash::PreHashed;
use turbo_tasks::{
backend::{PersistentTaskType, TaskExecutionSpec},
backend::{PersistentTaskType, TaskCollectiblesMap, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId,
TurboTasksBackendApi, ValueTypeId,
Expand Down Expand Up @@ -1573,7 +1573,7 @@ impl Task {
reader: TaskId,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> AutoMap<RawVc, i32> {
) -> TaskCollectiblesMap {
let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let mut aggregation_data = aggregation_context.aggregation_data(id);
aggregation_data.read_collectibles(trait_type, reader)
Expand Down
59 changes: 32 additions & 27 deletions crates/turbo-tasks-memory/src/task/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use auto_hash_map::{map::Entry, AutoMap};
use either::Either;
use parking_lot::Mutex;
use rustc_hash::FxHasher;
use turbo_tasks::{event::Event, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi};
use turbo_tasks::{
backend::TaskCollectiblesMap, event::Event, RawVc, TaskId, TaskIdSet, TraitTypeId,
TurboTasksBackendApi,
};

use super::{
meta_state::{FullTaskWriteGuard, TaskMetaStateWriteGuard},
Expand All @@ -32,7 +35,7 @@ pub enum RootType {

#[derive(Debug, Default)]
pub struct CollectiblesInfo {
collectibles: AutoMap<RawVc, i32>,
collectibles: TaskCollectiblesMap,
dependent_tasks: TaskIdSet,
}

Expand Down Expand Up @@ -90,8 +93,8 @@ impl Aggregated {
) {
if let Entry::Occupied(mut entry) = self.collectibles.entry(trait_type) {
let info = entry.get_mut();
info.dependent_tasks.remove(&reader);
if info.is_unset() {
let removed = info.dependent_tasks.remove(&reader);
if removed && info.is_unset() {
entry.remove();
}
}
Expand All @@ -101,7 +104,7 @@ impl Aggregated {
&mut self,
trait_type: TraitTypeId,
reader: TaskId,
) -> AutoMap<RawVc, i32> {
) -> TaskCollectiblesMap {
match self.collectibles.entry(trait_type) {
Entry::Occupied(mut e) => {
let info = e.get_mut();
Expand Down Expand Up @@ -293,7 +296,7 @@ impl<'a> AggregationContext for TaskAggregationContext<'a> {
}
Entry::Vacant(e) => {
let mut collectibles_info = CollectiblesInfo::default();
update_count_entry(collectibles_info.collectibles.entry(collectible), count);
collectibles_info.collectibles.insert(collectible, count);
e.insert(collectibles_info);
}
}
Expand Down Expand Up @@ -489,8 +492,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> {
change.dirty_tasks_update.push((self.id, 1));
}
if let Some(collectibles) = guard.collectibles.as_ref() {
for (&(trait_type_id, collectible), _) in collectibles.iter() {
change.collectibles.push((trait_type_id, collectible, 1));
for (&(trait_type_id, collectible), count) in collectibles.iter() {
change
.collectibles
.push((trait_type_id, collectible, *count));
}
}
if let TaskStateType::InProgress(box InProgressState {
Expand All @@ -499,8 +504,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> {
}) = &guard.state_type
{
if let Some(collectibles) = outdated_collectibles.as_ref() {
for (&(trait_type_id, collectible), _) in collectibles.iter() {
change.collectibles.push((trait_type_id, collectible, 1));
for (&(trait_type_id, collectible), count) in collectibles.iter() {
change
.collectibles
.push((trait_type_id, collectible, *count));
}
}
}
Expand Down Expand Up @@ -541,8 +548,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> {
change.dirty_tasks_update.push((self.id, -1));
}
if let Some(collectibles) = guard.collectibles.as_ref() {
for (&(trait_type_id, collectible), _) in collectibles.iter() {
change.collectibles.push((trait_type_id, collectible, -1));
for (&(trait_type_id, collectible), count) in collectibles.iter() {
change
.collectibles
.push((trait_type_id, collectible, -count));
}
}
if let TaskStateType::InProgress(box InProgressState {
Expand All @@ -551,8 +560,10 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> {
}) = &guard.state_type
{
if let Some(collectibles) = outdated_collectibles.as_ref() {
for (&(trait_type_id, collectible), _) in collectibles.iter() {
change.collectibles.push((trait_type_id, collectible, -1));
for (&(trait_type_id, collectible), count) in collectibles.iter() {
change
.collectibles
.push((trait_type_id, collectible, -*count));
}
}
}
Expand Down Expand Up @@ -588,19 +599,13 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> {
{
data.unfinished_tasks = unfinished_tasks_update.into_iter().collect();
}
data.dirty_tasks = dirty_tasks_update.into_iter().collect();
data.collectibles = collectibles
.into_iter()
.map(|(trait_type_id, collectible, count)| {
(
trait_type_id,
CollectiblesInfo {
collectibles: [(collectible, count)].iter().cloned().collect(),
dependent_tasks: TaskIdSet::default(),
},
)
})
.collect();
for (t, n) in dirty_tasks_update.into_iter() {
data.dirty_tasks.insert(t, n);
}
for (trait_type_id, collectible, count) in collectibles.into_iter() {
let info = data.collectibles.entry(trait_type_id).or_default();
update_count_entry(info.collectibles.entry(collectible), count);
}
}
data
}
Expand Down
92 changes: 92 additions & 0 deletions crates/turbo-tasks-memory/tests/recompute_collectibles.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#![feature(arbitrary_self_types)]

use anyhow::{bail, Result};
use turbo_tasks::{emit, CollectiblesSource, RcStr, State, ValueToString, Vc};
use turbo_tasks_testing::{register, run};

register!();

#[tokio::test]
async fn recompute() {
run! {
let input = ChangingInput {
state: State::new(1),
}.cell();
let output = compute(input, 100);
let read = output.await?;
assert_eq!(read.value, 42);
assert_eq!(read.collectible, "1");

for i in 2..100 {
input.await?.state.set(i);
let read = output.strongly_consistent().await?;
assert_eq!(read.value, 42);
assert_eq!(read.collectible, i.to_string());
}
}
}

#[turbo_tasks::value]
struct ChangingInput {
state: State<u32>,
}

#[turbo_tasks::value]
struct Output {
value: u32,
collectible: String,
}

#[turbo_tasks::value]
struct Collectible {
value: u32,
}

#[turbo_tasks::value_impl]
impl ValueToString for Collectible {
#[turbo_tasks::function]
fn to_string(&self) -> Vc<RcStr> {
Vc::cell(self.value.to_string().into())
}
}

#[turbo_tasks::function]
fn inner_compute(input: Vc<ChangingInput>) -> Vc<u32> {
inner_compute2(input, 1000)
}

#[turbo_tasks::function]
async fn inner_compute2(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<u32>> {
if innerness > 0 {
return Ok(inner_compute2(input, innerness - 1));
}
let collectible: Vc<Box<dyn ValueToString>> = Vc::upcast(
Collectible {
value: *input.await?.state.get(),
}
.cell(),
);
emit(collectible);

Ok(Vc::cell(42))
}

#[turbo_tasks::function]
async fn compute(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<Output>> {
if innerness > 0 {
return Ok(compute(input, innerness - 1));
}
let operation = inner_compute(input);
let value = *operation.await?;
let collectibles = operation.peek_collectibles::<Box<dyn ValueToString>>();
if collectibles.len() != 1 {
bail!("expected 1 collectible, found {}", collectibles.len());
}
let first = *collectibles.iter().next().unwrap();
let collectible = first.to_string().await?;
Ok(Output {
value,
collectible: collectible.to_string(),
}
.cell())
}
1 change: 1 addition & 0 deletions crates/turbo-tasks-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ anyhow = { workspace = true }
auto-hash-map = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
rustc-hash = { workspace = true }
tokio = { workspace = true }
turbo-tasks = { workspace = true }
7 changes: 3 additions & 4 deletions crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ use std::{
};

use anyhow::{anyhow, Result};
use auto_hash_map::AutoMap;
use futures::FutureExt;
use turbo_tasks::{
backend::CellContent,
backend::{CellContent, TaskCollectiblesMap},
event::{Event, EventListener},
registry,
test_helpers::with_turbo_tasks_for_testing,
Expand Down Expand Up @@ -244,12 +243,12 @@ impl TurboTasksApi for VcStorage {
fn unemit_collectibles(
&self,
_trait_type: turbo_tasks::TraitTypeId,
_collectibles: &AutoMap<RawVc, i32>,
_collectibles: &TaskCollectiblesMap,
) {
unimplemented!()
}

fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> AutoMap<RawVc, i32> {
fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
unimplemented!()
}

Expand Down
9 changes: 6 additions & 3 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{
any::Any,
borrow::Cow,
fmt,
fmt::{Debug, Display, Write},
fmt::{self, Debug, Display, Write},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::Arc,
Expand All @@ -12,6 +12,7 @@ use std::{

use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use rustc_hash::FxHasher;
use serde::{Deserialize, Serialize};
use tracing::Span;

Expand Down Expand Up @@ -276,6 +277,8 @@ impl CellContent {
}
}

pub type TaskCollectiblesMap = AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1>;

pub trait Backend: Sync + Send {
#[allow(unused_variables)]
fn initialize(&mut self, task_id_provider: &dyn TaskIdProvider) {}
Expand Down Expand Up @@ -388,7 +391,7 @@ pub trait Backend: Sync + Send {
trait_id: TraitTypeId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> AutoMap<RawVc, i32>;
) -> TaskCollectiblesMap;

fn emit_collectible(
&self,
Expand Down
Loading

0 comments on commit cf49da0

Please sign in to comment.