From 32e03b007199ee5239d6f27d7865ac3203d3b900 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 29 Jul 2024 10:52:22 +0200 Subject: [PATCH] new backend --- Cargo.lock | 1 + .../crates/turbo-tasks-backend/Cargo.toml | 1 + .../turbo-tasks-backend/src/backend/mod.rs | 318 +++++++++++++++++- .../src/backend/operation/connect_child.rs | 52 +++ .../src/backend/operation/mod.rs | 176 +++++++++- .../src/backend/storage.rs | 85 ++++- .../crates/turbo-tasks-backend/src/data.rs | 53 ++- .../crates/turbo-tasks-backend/src/lib.rs | 2 + .../src/utils/external_locks.rs | 61 ++++ .../turbo-tasks-backend/src/utils/mod.rs | 2 + .../src/utils/ptr_eq_arc.rs | 47 +++ .../key_value_pair_macro.rs} | 19 +- .../turbo-tasks-macros/src/derive/mod.rs | 2 + .../crates/turbo-tasks-macros/src/lib.rs | 12 +- .../src/{keyed.rs => key_value_pair.rs} | 5 +- turbopack/crates/turbo-tasks/src/lib.rs | 6 +- 16 files changed, 797 insertions(+), 45 deletions(-) create mode 100644 turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs create mode 100644 turbopack/crates/turbo-tasks-backend/src/utils/external_locks.rs create mode 100644 turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs rename turbopack/crates/turbo-tasks-macros/src/{with_key_macro.rs => derive/key_value_pair_macro.rs} (90%) rename turbopack/crates/turbo-tasks/src/{keyed.rs => key_value_pair.rs} (53%) diff --git a/Cargo.lock b/Cargo.lock index eeaa124ca7c02..e55a78eb2cf12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8356,6 +8356,7 @@ name = "turbo-tasks-backend" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "auto-hash-map", "dashmap", "once_cell", diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 8b21d629dfd3d..cd38a6bb31794 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } auto-hash-map = { workspace = true } dashmap = { workspace = true } once_cell = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index c4b918a446dee..cb6e5ae0d02ab 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1,37 +1,337 @@ mod operation; mod storage; -use std::{collections::VecDeque, sync::Arc}; +use std::{ + borrow::Cow, + collections::HashSet, + future::Future, + hash::BuildHasherDefault, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; -use parking_lot::Mutex; -use turbo_tasks::{backend::CachedTaskType, TaskId}; +use anyhow::Result; +use auto_hash_map::{AutoMap, AutoSet}; +use parking_lot::{Condvar, Mutex}; +use rustc_hash::FxHasher; +use turbo_tasks::{ + backend::{ + Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType, + TypedCellContent, + }, + event::EventListener, + util::IdFactoryWithReuse, + CellId, RawVc, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT, +}; -use self::{operation::Operation, storage::Storage}; +use self::{ + operation::{AnyOperation, ExecuteContext, Operation}, + storage::Storage, +}; use crate::{ data::{CachedDataItem, CachedDataUpdate}, - utils::{bi_map::BiMap, chunked_vec::ChunkedVec}, + utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc}, }; +const SNAPSHOT_REQUESTED_BIT: usize = 1 << 63; + +struct SnapshotRequest { + snapshot_requested: bool, + suspended_operations: HashSet>, +} + +impl SnapshotRequest { + fn new() -> Self { + Self { + snapshot_requested: false, + suspended_operations: HashSet::new(), + } + } +} + pub struct TurboTasksBackend { + persisted_task_id_factory: IdFactoryWithReuse, + transient_task_id_factory: IdFactoryWithReuse, + persisted_task_cache_log: Mutex, TaskId)>>, task_cache: BiMap, TaskId>, + persisted_storage_log: Mutex>, storage: Storage, - operations: Mutex>>, + + /// Number of executing operations + Highest bit is set when snapshot is + /// requested. When that bit is set, operations should pause until the + /// snapshot is completed. When the bit is set and in progress counter + /// reaches zero, `operations_completed_when_snapshot_requested` is + /// triggered. + in_progress_operations: AtomicUsize, + + snapshot_request: Mutex, + /// Condition Variable that is triggered when `in_progress_operations` + /// reaches zero while snapshot is requested. All operations are either + /// completed or suspended. + operations_suspended: Condvar, + /// Condition Variable that is triggered when a snapshot is completed and + /// operations can continue. + snapshot_completed: Condvar, } impl TurboTasksBackend { pub fn new() -> Self { Self { + persisted_task_id_factory: IdFactoryWithReuse::new_with_range( + 1, + (TRANSIENT_TASK_BIT - 1) as u64, + ), + transient_task_id_factory: IdFactoryWithReuse::new_with_range( + TRANSIENT_TASK_BIT as u64, + u32::MAX as u64, + ), persisted_task_cache_log: Mutex::new(ChunkedVec::new()), task_cache: BiMap::new(), persisted_storage_log: Mutex::new(ChunkedVec::new()), storage: Storage::new(), - operations: Mutex::new(VecDeque::new()), + in_progress_operations: AtomicUsize::new(0), + snapshot_request: Mutex::new(SnapshotRequest::new()), + operations_suspended: Condvar::new(), + snapshot_completed: Condvar::new(), + } + } + + fn run_operation( + &self, + operation: impl Operation, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + operation.execute(ExecuteContext::new(self, turbo_tasks)); + } + + fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) { + if (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0 { + let operation = Arc::new(suspend()); + let mut snapshot_request = self.snapshot_request.lock(); + if snapshot_request.snapshot_requested { + snapshot_request + .suspended_operations + .insert(operation.clone().into()); + let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel); + assert!((value & SNAPSHOT_REQUESTED_BIT) != 0); + if value == SNAPSHOT_REQUESTED_BIT { + self.operations_suspended.notify_all(); + } + self.snapshot_completed + .wait_while(&mut snapshot_request, |snapshot_request| { + snapshot_request.snapshot_requested + }); + self.in_progress_operations.fetch_add(1, Ordering::AcqRel); + snapshot_request + .suspended_operations + .remove(&operation.into()); + } + } + } +} + +// Operations +impl TurboTasksBackend { + pub fn connect_child( + &self, + parent_task: TaskId, + child_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.run_operation( + operation::ConnectChildOperation::new(parent_task, child_task), + turbo_tasks, + ); + } +} + +impl Backend for TurboTasksBackend { + fn get_or_create_persistent_task( + &self, + task_type: CachedTaskType, + parent_task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskId { + if let Some(task_id) = self.task_cache.lookup_forward(&task_type) { + self.connect_child(parent_task, task_id, turbo_tasks); + return task_id; + } + + let task_type = Arc::new(task_type); + let task_id = self.persisted_task_id_factory.get(); + if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { + // Safety: We just created the id and failed to insert it. + unsafe { + self.persisted_task_id_factory.reuse(task_id); + } + self.connect_child(parent_task, existing_task_id, turbo_tasks); + return existing_task_id; } + self.persisted_task_cache_log + .lock() + .push((task_type, task_id)); + + self.connect_child(parent_task, task_id, turbo_tasks); + + task_id } - fn run_operation(&self, operation: Box) { - self.operations.lock().push_back(operation); + fn invalidate_task(&self, _: TaskId, _: &dyn TurboTasksBackendApi) { + todo!() + } + fn invalidate_tasks(&self, _: &[TaskId], _: &dyn TurboTasksBackendApi) { + todo!() + } + fn invalidate_tasks_set( + &self, + _: &AutoSet, 2>, + _: &dyn TurboTasksBackendApi, + ) { + todo!() + } + fn get_task_description(&self, _: TaskId) -> std::string::String { + todo!() + } + type ExecutionScopeFuture>> = Pin> + Send + Sync>> where T: Send + 'static; + fn execution_scope>>( + &self, + _: TaskId, + _: T, + ) -> Self::ExecutionScopeFuture + where + T: Send + 'static, + { + todo!() + } + fn try_start_task_execution( + &self, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) -> std::option::Option> { + todo!() + } + fn task_execution_result( + &self, + _: TaskId, + _: Result, std::option::Option>>, + _: &dyn TurboTasksBackendApi, + ) { + todo!() + } + fn task_execution_completed( + &self, + _: TaskId, + _: Duration, + _: usize, + _: AutoMap, 8>, + _: bool, + _: &dyn TurboTasksBackendApi, + ) -> bool { + todo!() + } + fn run_backend_job( + &self, + _: BackendJobId, + _: &dyn TurboTasksBackendApi, + ) -> Pin + Send + 'static)>> { + todo!() + } + fn try_read_task_output( + &self, + _: TaskId, + _: TaskId, + _: bool, + _: &dyn TurboTasksBackendApi, + ) -> Result, turbo_tasks::Error> { + todo!() + } + fn try_read_task_output_untracked( + &self, + _: TaskId, + _: bool, + _: &dyn TurboTasksBackendApi, + ) -> Result, turbo_tasks::Error> { + todo!() + } + fn try_read_task_cell( + &self, + _: TaskId, + _: CellId, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) -> Result, turbo_tasks::Error> { + todo!() + } + fn try_read_task_cell_untracked( + &self, + _: TaskId, + _: CellId, + _: &dyn TurboTasksBackendApi, + ) -> Result, turbo_tasks::Error> { + todo!() + } + fn read_task_collectibles( + &self, + _: TaskId, + _: TraitTypeId, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) -> AutoMap, 1> { + todo!() + } + fn emit_collectible( + &self, + _: TraitTypeId, + _: RawVc, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) { + todo!() + } + fn unemit_collectible( + &self, + _: TraitTypeId, + _: RawVc, + _: u32, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) { + todo!() + } + fn update_task_cell( + &self, + _: TaskId, + _: CellId, + _: CellContent, + _: &dyn TurboTasksBackendApi, + ) { + todo!() + } + fn get_or_create_transient_task( + &self, + _: CachedTaskType, + _: TaskId, + _: &dyn TurboTasksBackendApi, + ) -> TaskId { + todo!() + } + fn connect_task(&self, _: TaskId, _: TaskId, _: &dyn TurboTasksBackendApi) { + todo!() + } + fn create_transient_task( + &self, + _: TransientTaskType, + _: &dyn TurboTasksBackendApi, + ) -> TaskId { + todo!() + } + fn dispose_root_task(&self, _: TaskId, _: &dyn TurboTasksBackendApi) { + todo!() } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs new file mode 100644 index 0000000000000..11f5f5c7ed4a6 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -0,0 +1,52 @@ +use serde::{Deserialize, Serialize}; +use turbo_tasks::TaskId; + +use super::{ExecuteContext, Operation}; +use crate::{data::CachedDataItem, suspend_point}; + +#[derive(Serialize, Deserialize, Clone, Default)] +pub enum ConnectChildOperation { + AddChild { + parent_task: TaskId, + child_task: TaskId, + }, + #[default] + Done, + // TODO Add aggregated edge +} + +impl ConnectChildOperation { + pub fn new(parent_task: TaskId, child_task: TaskId) -> Self { + ConnectChildOperation::AddChild { + parent_task, + child_task, + } + } +} + +impl Operation for ConnectChildOperation { + fn execute(mut self, ctx: ExecuteContext<'_>) { + loop { + match self { + ConnectChildOperation::AddChild { + parent_task, + child_task, + } => { + let mut parent_task = ctx.task(parent_task); + if parent_task.add(CachedDataItem::Child { + task: child_task, + value: (), + }) { + // TODO add aggregated edge + suspend_point!(self, ctx, ConnectChildOperation::Done); + } else { + suspend_point!(self, ctx, ConnectChildOperation::Done); + } + } + ConnectChildOperation::Done => { + return; + } + } + } + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index b3b434511bf3f..5701cf7c4be2d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -1 +1,175 @@ -pub trait Operation {} +mod connect_child; + +use serde::{Deserialize, Serialize}; +use turbo_tasks::{KeyValuePair, TaskId, TurboTasksBackendApi}; + +use super::{storage::StorageWriteGuard, TurboTasksBackend}; +use crate::data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate}; + +pub trait Operation: Serialize + for<'de> Deserialize<'de> { + fn execute(self, ctx: ExecuteContext<'_>); +} + +pub struct ExecuteContext<'a> { + backend: &'a TurboTasksBackend, + turbo_tasks: &'a dyn TurboTasksBackendApi, +} + +impl<'a> ExecuteContext<'a> { + pub fn new( + backend: &'a TurboTasksBackend, + turbo_tasks: &'a dyn TurboTasksBackendApi, + ) -> Self { + Self { + backend, + turbo_tasks, + } + } + + pub fn task(&self, task_id: TaskId) -> TaskGuard<'a> { + TaskGuard { + task: self.backend.storage.access_mut(task_id), + task_id, + backend: self.backend, + } + } + + pub fn operation_suspend_point(&self, f: impl FnOnce() -> AnyOperation) { + self.backend.operation_suspend_point(f) + } +} + +pub struct TaskGuard<'a> { + task_id: TaskId, + task: StorageWriteGuard<'a, TaskId, CachedDataItem>, + backend: &'a TurboTasksBackend, +} + +impl<'a> TaskGuard<'a> { + fn new( + task_id: TaskId, + task: StorageWriteGuard<'a, TaskId, CachedDataItem>, + backend: &'a TurboTasksBackend, + ) -> Self { + Self { + task_id, + task, + backend, + } + } + + pub fn add(&mut self, item: CachedDataItem) -> bool { + if !item.is_persistent() { + self.task.add(item) + } else { + if self.task.add(item.clone()) { + let (key, value) = item.into_key_and_value(); + self.backend + .persisted_storage_log + .lock() + .push(CachedDataUpdate { + key, + task: self.task_id, + value: Some(value), + }); + true + } else { + false + } + } + } + + pub fn upsert(&mut self, item: CachedDataItem) -> Option { + let (key, value) = item.into_key_and_value(); + if !key.is_persistent() { + self.task + .upsert(CachedDataItem::from_key_and_value(key, value)) + } else if value.is_persistent() { + let old = self.task.upsert(CachedDataItem::from_key_and_value( + key.clone(), + value.clone(), + )); + self.backend + .persisted_storage_log + .lock() + .push(CachedDataUpdate { + key, + task: self.task_id, + value: Some(value), + }); + old + } else { + let item = CachedDataItem::from_key_and_value(key.clone(), value); + if let Some(old) = self.task.upsert(item) { + if old.is_persistent() { + self.backend + .persisted_storage_log + .lock() + .push(CachedDataUpdate { + key, + task: self.task_id, + value: None, + }); + } + Some(old) + } else { + None + } + } + } + + pub fn remove(&mut self, key: &CachedDataItemKey) -> Option { + let old_value = self.task.remove(&key); + if let Some(value) = old_value { + if key.is_persistent() && value.is_persistent() { + let key = key.clone(); + self.backend + .persisted_storage_log + .lock() + .push(CachedDataUpdate { + key, + task: self.task_id, + value: None, + }); + } + Some(value) + } else { + None + } + } +} + +macro_rules! impl_operation { + ($name:ident $type_path:path) => { + impl From<$type_path> for AnyOperation { + fn from(op: $type_path) -> Self { + AnyOperation::$name(op) + } + } + + pub use $type_path; + }; +} + +#[derive(Serialize, Deserialize)] +pub enum AnyOperation { + ConnectChild(connect_child::ConnectChildOperation), +} + +impl Operation for AnyOperation { + fn execute(self, ctx: ExecuteContext<'_>) { + match self { + AnyOperation::ConnectChild(op) => op.execute(ctx), + } + } +} + +impl_operation!(ConnectChild connect_child::ConnectChildOperation); + +#[macro_export(local_inner_macros)] +macro_rules! suspend_point { + ($self:expr, $ctx:expr, $op:expr) => { + $self = $op; + $ctx.operation_suspend_point(|| $self.clone().into()); + }; +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 957dabd2db994..e609a386351de 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1,6 +1,13 @@ -use auto_hash_map::AutoMap; -use dashmap::DashMap; -use turbo_tasks::Keyed; +use std::{ + hash::{BuildHasherDefault, Hash}, + ops::{Deref, DerefMut}, + thread::available_parallelism, +}; + +use auto_hash_map::{map::Entry, AutoMap}; +use dashmap::{mapref::one::RefMut, DashMap}; +use rustc_hash::FxHasher; +use turbo_tasks::KeyValuePair; enum PersistanceState { /// We know that all state of the object is only in the cache and nothing is @@ -12,23 +19,83 @@ enum PersistanceState { Unknown, } -struct InnerStorage { +pub struct InnerStorage { + // TODO consider adding some inline storage map: AutoMap, persistance_state: PersistanceState, } -pub struct Storage { - map: DashMap>, +impl InnerStorage { + fn new() -> Self { + Self { + map: AutoMap::new(), + persistance_state: PersistanceState::Unknown, + } + } + + pub fn add(&mut self, item: T) -> bool { + let (key, value) = item.into_key_and_value(); + match self.map.entry(key) { + Entry::Occupied(_) => false, + Entry::Vacant(e) => { + e.insert(value); + true + } + } + } + + pub fn upsert(&mut self, item: T) -> Option { + let (key, value) = item.into_key_and_value(); + self.map.insert(key, value) + } + + pub fn remove(&mut self, key: &T::Key) -> Option { + self.map.remove(key) + } +} + +pub struct Storage { + map: DashMap, BuildHasherDefault>, } -impl Storage +impl Storage where K: Eq + std::hash::Hash + Clone, - T: Keyed, + T: KeyValuePair, { pub fn new() -> Self { Self { - map: DashMap::new(), + map: DashMap::with_capacity_and_hasher_and_shard_amount( + 1024 * 1024, + Default::default(), + available_parallelism().map_or(4, |v| v.get()) * 64, + ), } } + + pub fn access_mut(&self, key: K) -> StorageWriteGuard<'_, K, T> { + let inner = match self.map.entry(key) { + dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(), + dashmap::mapref::entry::Entry::Vacant(e) => e.insert(InnerStorage::new()), + }; + StorageWriteGuard { inner } + } +} + +pub struct StorageWriteGuard<'a, K, T: KeyValuePair> { + inner: RefMut<'a, K, InnerStorage, BuildHasherDefault>, +} + +impl<'a, K: Eq + Hash, T: KeyValuePair> Deref for StorageWriteGuard<'a, K, T> { + type Target = InnerStorage; + + fn deref(&self) -> &Self::Target { + &*self.inner + } +} + +impl<'a, K: Eq + Hash, T: KeyValuePair> DerefMut for StorageWriteGuard<'a, K, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut *self.inner + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index d4a032e966627..76e7df4f9e64e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,12 +1,12 @@ -use turbo_tasks::{util::SharedError, CellId, SharedReference, TaskId}; +use turbo_tasks::{util::SharedError, CellId, KeyValuePair, SharedReference, TaskId}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct CellRef { task: TaskId, cell: CellId, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum OutputValue { Cell(CellRef), Output(TaskId), @@ -22,21 +22,20 @@ impl OutputValue { } } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum RootType { RootTask, OnceTask, ReadingStronglyConsistent, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum InProgressState { Scheduled { clean: bool }, InProgress { clean: bool, stale: bool }, } -#[turbo_tasks::with_key] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, KeyValuePair)] pub enum CachedDataItem { // Output Output { @@ -173,6 +172,46 @@ impl CachedDataItem { } } +impl CachedDataItemKey { + pub fn is_persistent(&self) -> bool { + match self { + CachedDataItemKey::Output { .. } => true, + CachedDataItemKey::Collectible { collectible, .. } => !collectible.task.is_transient(), + CachedDataItemKey::Dirty { .. } => true, + CachedDataItemKey::DirtyWhenPersisted { .. } => true, + CachedDataItemKey::Child { task, .. } => !task.is_transient(), + CachedDataItemKey::CellData { .. } => true, + CachedDataItemKey::OutputDependency { target, .. } => !target.is_transient(), + CachedDataItemKey::CellDependency { target, .. } => !target.task.is_transient(), + CachedDataItemKey::OutputDependent { task, .. } => !task.is_transient(), + CachedDataItemKey::CellDependent { task, .. } => !task.is_transient(), + CachedDataItemKey::AggregationNumber { .. } => true, + CachedDataItemKey::Follower { task, .. } => !task.is_transient(), + CachedDataItemKey::Upper { task, .. } => !task.is_transient(), + CachedDataItemKey::AggregatedDirtyTask { task, .. } => !task.is_transient(), + CachedDataItemKey::AggregatedCollectible { collectible, .. } => { + !collectible.task.is_transient() + } + CachedDataItemKey::AggregatedUnfinishedTasks { .. } => true, + CachedDataItemKey::AggregateRootType { .. } => false, + CachedDataItemKey::InProgress { .. } => false, + CachedDataItemKey::OutdatedCollectible { .. } => false, + CachedDataItemKey::OutdatedOutputDependency { .. } => false, + CachedDataItemKey::OutdatedCellDependency { .. } => false, + CachedDataItemKey::Error { .. } => false, + } + } +} + +impl CachedDataItemValue { + pub fn is_persistent(&self) -> bool { + match self { + CachedDataItemValue::Output { value } => !value.is_transient(), + _ => true, + } + } +} + trait IsDefault { fn is_default(&self) -> bool; } diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index 5f67a0c86f697..5fc95b21a44d9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -1,3 +1,5 @@ mod backend; mod data; mod utils; + +pub use backend::TurboTasksBackend; diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/external_locks.rs b/turbopack/crates/turbo-tasks-backend/src/utils/external_locks.rs new file mode 100644 index 0000000000000..ddbfa54a75479 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/utils/external_locks.rs @@ -0,0 +1,61 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + hash::Hash, + mem::transmute, + sync::Arc, +}; + +use parking_lot::{Mutex, MutexGuard}; + +pub struct ExternalLocks { + locks: HashMap>>, +} + +impl ExternalLocks { + pub fn new() -> Self { + Self { + locks: HashMap::new(), + } + } + + pub fn lock(&mut self, key: K) -> ExternalLockGuard { + let mutex = match self.locks.entry(key) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let lock = Arc::new(Mutex::new(())); + e.insert(lock.clone()); + if self.locks.len().is_power_of_two() { + let to_remove = self + .locks + .iter() + .filter_map(|(k, v)| { + if Arc::strong_count(v) == 1 { + Some(k.clone()) + } else { + None + } + }) + .collect::>(); + to_remove.into_iter().for_each(|k| { + self.locks.remove(&k); + }); + if self.locks.capacity() > self.locks.len() * 3 { + self.locks.shrink_to_fit(); + } + } + lock + } + }; + let guard = mutex.lock(); + // Safety: We know that the guard is valid for the lifetime of the lock as we + // keep the lock + let guard = unsafe { transmute::, MutexGuard<'static, _>>(guard) }; + ExternalLockGuard { lock: mutex, guard } + } +} + +pub struct ExternalLockGuard { + // Safety: guard must be before lock as it is dropped first + guard: MutexGuard<'static, ()>, + lock: Arc>, +} diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs b/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs index 54b7d067e843b..84e4a29fdb11a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/mod.rs @@ -1,2 +1,4 @@ pub mod bi_map; pub mod chunked_vec; +pub mod external_locks; +pub mod ptr_eq_arc; diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs b/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs new file mode 100644 index 0000000000000..87543c0399031 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs @@ -0,0 +1,47 @@ +use std::{ + hash::{Hash, Hasher}, + ops::Deref, + sync::Arc, +}; + +pub struct PtrEqArc(Arc); + +impl PtrEqArc { + pub fn new(value: T) -> Self { + Self(Arc::new(value)) + } +} + +impl From> for PtrEqArc { + fn from(value: Arc) -> Self { + Self(value) + } +} + +impl Deref for PtrEqArc { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Clone for PtrEqArc { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl PartialEq for PtrEqArc { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl Eq for PtrEqArc {} + +impl Hash for PtrEqArc { + fn hash(&self, state: &mut H) { + Arc::as_ptr(&self.0).hash(state) + } +} diff --git a/turbopack/crates/turbo-tasks-macros/src/with_key_macro.rs b/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs similarity index 90% rename from turbopack/crates/turbo-tasks-macros/src/with_key_macro.rs rename to turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs index 996ad6b8dc3c7..66f602491a083 100644 --- a/turbopack/crates/turbo-tasks-macros/src/with_key_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/derive/key_value_pair_macro.rs @@ -2,10 +2,9 @@ use proc_macro::TokenStream; use quote::quote; use syn::{parse_macro_input, Ident, ItemEnum}; -pub fn with_key(input: TokenStream) -> TokenStream { +pub fn derive_key_value_pair(input: TokenStream) -> TokenStream { let input = parse_macro_input!(input as ItemEnum); - let attrs = &input.attrs; let ident = &input.ident; let vis = &input.vis; let key_name = Ident::new(&format!("{}Key", input.ident), input.ident.span()); @@ -60,9 +59,7 @@ pub fn with_key(input: TokenStream) -> TokenStream { let value_clone_fields = clone_fields(&value_fields); quote! { - #input - - impl turbo_tasks::Keyed for #ident { + impl turbo_tasks::KeyValuePair for #ident { type Key = #key_name; type Value = #value_name; @@ -90,9 +87,17 @@ pub fn with_key(input: TokenStream) -> TokenStream { _ => panic!("Invalid key and value combination"), } } + + fn into_key_and_value(self) -> (#key_name, #value_name) { + match self { + #( + #ident::#variant_names { #key_pat #value_pat } => (#key_name::#variant_names { #key_pat }, #value_name::#variant_names { #value_pat }), + )* + } + } } - #(#attrs)* + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #vis enum #key_name { #( #variant_names { @@ -101,7 +106,7 @@ pub fn with_key(input: TokenStream) -> TokenStream { )* } - #(#attrs)* + #[derive(Debug, Clone)] #vis enum #value_name { #( #variant_names { diff --git a/turbopack/crates/turbo-tasks-macros/src/derive/mod.rs b/turbopack/crates/turbo-tasks-macros/src/derive/mod.rs index d48323309096e..d8c507574ab3b 100644 --- a/turbopack/crates/turbo-tasks-macros/src/derive/mod.rs +++ b/turbopack/crates/turbo-tasks-macros/src/derive/mod.rs @@ -1,4 +1,5 @@ mod deterministic_hash_macro; +mod key_value_pair_macro; mod resolved_value_macro; mod task_input_macro; mod trace_raw_vcs_macro; @@ -6,6 +7,7 @@ mod value_debug_format_macro; mod value_debug_macro; pub use deterministic_hash_macro::derive_deterministic_hash; +pub use key_value_pair_macro::derive_key_value_pair; pub use resolved_value_macro::derive_resolved_value; use syn::{spanned::Spanned, Attribute, Meta, MetaList, NestedMeta}; pub use task_input_macro::derive_task_input; diff --git a/turbopack/crates/turbo-tasks-macros/src/lib.rs b/turbopack/crates/turbo-tasks-macros/src/lib.rs index 217c4dd54c299..7b415f3691aea 100644 --- a/turbopack/crates/turbo-tasks-macros/src/lib.rs +++ b/turbopack/crates/turbo-tasks-macros/src/lib.rs @@ -11,7 +11,6 @@ mod primitive_macro; mod value_impl_macro; mod value_macro; mod value_trait_macro; -mod with_key_macro; extern crate proc_macro; @@ -48,6 +47,11 @@ pub fn derive_task_input(input: TokenStream) -> TokenStream { derive::derive_task_input(input) } +#[proc_macro_derive(KeyValuePair)] +pub fn derive_key_value_pair(input: TokenStream) -> TokenStream { + derive::derive_key_value_pair(input) +} + /// Creates a Vc struct for a `struct` or `enum` that represent /// that type placed into a cell in a Task. /// @@ -178,12 +182,6 @@ pub fn value_impl(args: TokenStream, input: TokenStream) -> TokenStream { value_impl_macro::value_impl(args, input) } -#[proc_macro_error] -#[proc_macro_attribute] -pub fn with_key(_args: TokenStream, input: TokenStream) -> TokenStream { - with_key_macro::with_key(input) -} - #[allow_internal_unstable(min_specialization, into_future, trivial_bounds)] #[proc_macro_error] #[proc_macro] diff --git a/turbopack/crates/turbo-tasks/src/keyed.rs b/turbopack/crates/turbo-tasks/src/key_value_pair.rs similarity index 53% rename from turbopack/crates/turbo-tasks/src/keyed.rs rename to turbopack/crates/turbo-tasks/src/key_value_pair.rs index 46a1cf7f6b9bf..6aceaea04d4f7 100644 --- a/turbopack/crates/turbo-tasks/src/keyed.rs +++ b/turbopack/crates/turbo-tasks/src/key_value_pair.rs @@ -1,7 +1,8 @@ -pub trait Keyed { - type Key; +pub trait KeyValuePair { + type Key: PartialEq + Eq + std::hash::Hash; type Value; fn key(&self) -> Self::Key; fn value(&self) -> Self::Value; fn from_key_and_value(key: Self::Key, value: Self::Value) -> Self; + fn into_key_and_value(self) -> (Self::Key, Self::Value); } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 70fb61a46c8ee..e3b7b3829e0b0 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -49,7 +49,7 @@ mod id; mod id_factory; mod invalidation; mod join_iter_ext; -mod keyed; +mod key_value_pair; #[doc(hidden)] pub mod macro_helpers; mod magic_any; @@ -87,7 +87,7 @@ pub use invalidation::{ DynamicEqHash, InvalidationReason, InvalidationReasonKind, InvalidationReasonSet, }; pub use join_iter_ext::{JoinIterExt, TryFlatJoinIterExt, TryJoinIterExt}; -pub use keyed::Keyed; +pub use key_value_pair::KeyValuePair; pub use magic_any::MagicAny; pub use manager::{ dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful, @@ -103,7 +103,7 @@ use rustc_hash::FxHasher; pub use state::State; pub use task::{task_input::TaskInput, SharedReference}; pub use trait_ref::{IntoTraitRef, TraitRef}; -pub use turbo_tasks_macros::{function, value, value_impl, value_trait, with_key, TaskInput}; +pub use turbo_tasks_macros::{function, value, value_impl, value_trait, KeyValuePair, TaskInput}; pub use value::{TransientInstance, TransientValue, Value}; pub use value_type::{TraitMethod, TraitType, ValueType}; pub use vc::{