Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbo-tasks) Add a higher-level task-local state API for the Backend trait #68996

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::{Borrow, Cow},
cell::RefCell,
future::Future,
hash::{BuildHasher, BuildHasherDefault, Hash},
num::NonZeroU32,
Expand All @@ -16,7 +15,6 @@ use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use rustc_hash::FxHasher;
use tokio::task::futures::TaskLocalFuture;
use tracing::trace_span;
use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed};
use turbo_tasks::{
Expand All @@ -37,14 +35,20 @@ use crate::{
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, TaskType, DEPENDENCIES_TO_TRACK},
task::{ReadCellError, Task, TaskType},
task_statistics::TaskStatisticsApi,
};

fn prehash_task_type(task_type: CachedTaskType) -> PreHashed<CachedTaskType> {
BuildHasherDefault::<FxHasher>::prehash(&Default::default(), task_type)
}

pub struct TaskState {
/// Cells/Outputs/Collectibles that are read during task execution. These will be stored as
/// dependencies when the execution has finished.
pub dependencies_to_track: TaskEdgesSet,
}

pub struct MemoryBackend {
persistent_tasks: NoMoveVec<Task, 13>,
transient_tasks: NoMoveVec<Task, 10>,
Expand Down Expand Up @@ -436,14 +440,11 @@ impl Backend for MemoryBackend {
self.with_task(task, |task| task.get_description())
}

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static> =
TaskLocalFuture<RefCell<TaskEdgesSet>, T>;
fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
_task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T> {
DEPENDENCIES_TO_TRACK.scope(RefCell::new(TaskEdgesSet::new()), future)
type TaskState = TaskState;
fn new_task_state(&self, _task: TaskId) -> Self::TaskState {
TaskState {
dependencies_to_track: TaskEdgesSet::new(),
}
}

fn try_start_task_execution<'a>(
Expand Down Expand Up @@ -529,7 +530,7 @@ impl Backend for MemoryBackend {
move || format!("reading task output from {reader}"),
turbo_tasks,
|output| {
Task::add_dependency_to_current(TaskEdge::Output(task));
Task::add_dependency_to_current(TaskEdge::Output(task), turbo_tasks);
output.read(reader)
},
)
Expand Down Expand Up @@ -564,7 +565,7 @@ impl Backend for MemoryBackend {
})
.into_typed(index.type_id)))
} else {
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index));
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index), turbo_tasks);
self.with_task(task_id, |task| {
match task.read_cell(
index,
Expand Down Expand Up @@ -623,7 +624,7 @@ impl Backend for MemoryBackend {
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> TaskCollectiblesMap {
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id));
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id), turbo_tasks);
Task::read_collectibles(id, trait_id, reader, self, turbo_tasks)
}

Expand Down
25 changes: 10 additions & 15 deletions turbopack/crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::Cow,
cell::RefCell,
fmt::{self, Debug, Display, Formatter},
future::Future,
hash::{BuildHasherDefault, Hash},
Expand All @@ -17,14 +16,13 @@ use either::Either;
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHasher;
use smallvec::SmallVec;
use tokio::task_local;
use tracing::Span;
use turbo_prehash::PreHashed;
use turbo_tasks::{
backend::{CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, ReadConsistency, TaskId, TaskIdSet,
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
TraitTypeId, TurboTasksBackendApi, TurboTasksBackendApiExt, ValueTypeId,
};

use crate::{
Expand All @@ -45,12 +43,6 @@ pub type NativeTaskFn = Box<dyn Fn() -> NativeTaskFuture + Send + Sync>;
mod aggregation;
mod meta_state;

task_local! {
/// Cells/Outputs/Collectibles that are read during task execution
/// These will be stored as dependencies when the execution has finished
pub(crate) static DEPENDENCIES_TO_TRACK: RefCell<TaskEdgesSet>;
}

type OnceTaskFn = Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;

/// Different Task types
Expand Down Expand Up @@ -966,7 +958,8 @@ impl Task {
let mut change_job = None;
let mut remove_job = None;
let mut drained_cells = SmallVec::<[Cell; 8]>::new();
let dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take());
let dependencies = turbo_tasks
.write_task_state(|deps| std::mem::take(&mut deps.dependencies_to_track));
{
let mut state = self.full_state_mut();

Expand Down Expand Up @@ -1343,11 +1336,13 @@ impl Task {
}
}

pub(crate) fn add_dependency_to_current(dep: TaskEdge) {
DEPENDENCIES_TO_TRACK.with(|list| {
let mut list = list.borrow_mut();
list.insert(dep);
})
pub(crate) fn add_dependency_to_current(
dep: TaskEdge,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) {
turbo_tasks.write_task_state(|ts| {
ts.dependencies_to_track.insert(dep);
});
}

/// Get an [Invalidator] that can be used to invalidate the current [Task]
Expand Down
131 changes: 110 additions & 21 deletions turbopack/crates/turbo-tasks-testing/tests/detached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,142 @@

use tokio::{
sync::{watch, Notify},
time::{timeout, Duration},
time::{sleep, timeout, Duration},
};
use turbo_tasks::{turbo_tasks, Completion, TransientInstance, Vc};
use turbo_tasks::{turbo_tasks, State, TransientInstance, Vc};
use turbo_tasks_testing::{register, run, Registration};

static REGISTRATION: Registration = register!();

#[tokio::test]
async fn test_spawns_detached() -> anyhow::Result<()> {
run(&REGISTRATION, || async {
let notify = TransientInstance::new(Notify::new());
let (tx, mut rx) = watch::channel(None);
// timeout: prevent the test from hanging, and fail instead if this is broken
timeout(Duration::from_secs(5), async {
bgw marked this conversation as resolved.
Show resolved Hide resolved
let notify = TransientInstance::new(Notify::new());
let (tx, mut rx) = watch::channel(None);
let tx = TransientInstance::new(tx);

// create the task
let out_vc = spawns_detached(notify.clone(), TransientInstance::new(tx));
// create the task
let out_vc = spawns_detached(notify.clone(), tx.clone());

// see that the task does not exit yet
timeout(Duration::from_millis(100), out_vc.strongly_consistent())
.await
.expect_err("should wait on the detached task");
// see that the task does not exit yet
timeout(Duration::from_millis(100), out_vc.strongly_consistent())
.await
.expect_err("should wait on the detached task");

// let the detached future exit
notify.notify_waiters();
// let the detached future exit
notify.notify_waiters();

// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap();
assert_eq!(*detached_vc.await.unwrap(), 42);
// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await?.unwrap();
assert_eq!(*detached_vc.strongly_consistent().await?, 42);

// the parent task should now be able to exit
out_vc.strongly_consistent().await.unwrap();
// the parent task should now be able to exit
out_vc.strongly_consistent().await?;

Ok(())
Ok(())
})
.await?
})
.await
}

#[turbo_tasks::function]
fn spawns_detached(
async fn spawns_detached(
notify: TransientInstance<Notify>,
sender: TransientInstance<watch::Sender<Option<Vc<u32>>>>,
) -> Vc<Completion> {
) -> Vc<()> {
tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move {
notify.notified().await;
// creating cells after the normal lifetime of the task should be okay, as the parent task
// is waiting on us before exiting!
sender.send(Some(Vc::cell(42))).unwrap();
Ok(())
})));
Completion::new()
Vc::cell(())
}

#[tokio::test]
async fn test_spawns_detached_changing() -> anyhow::Result<()> {
run(&REGISTRATION, || async {
// timeout: prevent the test from hanging, and fail instead if this is broken
timeout(Duration::from_secs(5), async {
let (tx, mut rx) = watch::channel(None);
let tx = TransientInstance::new(tx);

// state that's read by the detached future
let changing_input_detached = ChangingInput {
state: State::new(42),
}
.cell();

// state that's read by the outer task
let changing_input_outer = ChangingInput {
state: State::new(0),
}
.cell();

// create the task
let out_vc =
spawns_detached_changing(tx.clone(), changing_input_detached, changing_input_outer);

// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap();
assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 42);

// the parent task should now be able to exit
out_vc.strongly_consistent().await.unwrap();

// changing either input should invalidate the vc and cause it to run again
changing_input_detached.await.unwrap().state.set(43);
out_vc.strongly_consistent().await.unwrap();
assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 43);

changing_input_outer.await.unwrap().state.set(44);
assert_eq!(*out_vc.strongly_consistent().await.unwrap(), 44);

Ok(())
})
.await?
})
.await
}

#[turbo_tasks::value]
struct ChangingInput {
state: State<u32>,
}

#[turbo_tasks::function]
async fn spawns_detached_changing(
sender: TransientInstance<watch::Sender<Option<Vc<u32>>>>,
changing_input_detached: Vc<ChangingInput>,
changing_input_outer: Vc<ChangingInput>,
) -> Vc<u32> {
let tt = turbo_tasks();
tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move {
sleep(Duration::from_millis(100)).await;
// nested detached_for_testing calls should work
tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move {
sleep(Duration::from_millis(100)).await;
// creating cells after the normal lifetime of the task should be okay, as the parent
// task is waiting on us before exiting!
sender
.send(Some(Vc::cell(
*read_changing_input(changing_input_detached).await.unwrap(),
)))
.unwrap();
Ok(())
})));
Ok(())
})));
Vc::cell(*read_changing_input(changing_input_outer).await.unwrap())
}

// spawns_detached should take a dependency on this function for each input
#[turbo_tasks::function]
async fn read_changing_input(changing_input: Vc<ChangingInput>) -> Vc<u32> {
// when changing_input.set is called, it will trigger an invalidator for this task
Vc::cell(*changing_input.await.unwrap().state.get())
}
29 changes: 21 additions & 8 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,28 @@ pub trait Backend: Sync + Send {

fn get_task_description(&self, task: TaskId) -> String;

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static>: Future<Output = Result<()>>
+ Send
+ 'static;
/// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with
/// [`Self::new_task_state`].
///
/// This value that can later be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] or
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
///
/// This data may be shared across multiple threads (must be `Sync`) in order to support
/// detached futures ([`crate::TurboTasksApi::detached_for_testing`]) and [pseudo-tasks using
/// `local_cells`][crate::function]. A [`RwLock`][std::sync::RwLock] is used to provide
/// concurrent access.
type TaskState: Send + Sync + 'static;

fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T>;
/// Constructs a new task-local [`Self::TaskState`] for the given `task_id`.
///
/// If a task is re-executed (e.g. because it is invalidated), this function will be called
/// again with the same [`TaskId`].
///
/// This value can be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] and
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
fn new_task_state(&self, task: TaskId) -> Self::TaskState;

fn try_start_task_execution<'a>(
&'a self,
Expand Down
3 changes: 2 additions & 1 deletion turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub use manager::{
dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, Invalidator, ReadConsistency, TaskPersistence, TurboTasks,
TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo,
TurboTasksApi, TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused,
UpdateInfo,
};
pub use native_function::{FunctionMeta, NativeFunction};
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
Expand Down
Loading
Loading