diff --git a/turbopack/crates/turbo-tasks-testing/tests/detached.rs b/turbopack/crates/turbo-tasks-testing/tests/detached.rs index e738d43ae76f0..b8210b269b912 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/detached.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/detached.rs @@ -2,9 +2,9 @@ use tokio::{ sync::{watch, Notify}, - time::{timeout, Duration}, + time::{sleep, timeout, Duration}, }; -use turbo_tasks::{turbo_tasks, Completion, TransientInstance, Vc}; +use turbo_tasks::{turbo_tasks, State, TransientInstance, Vc}; use turbo_tasks_testing::{register, run, Registration}; static REGISTRATION: Registration = register!(); @@ -12,37 +12,41 @@ static REGISTRATION: Registration = register!(); #[tokio::test] async fn test_spawns_detached() -> anyhow::Result<()> { run(®ISTRATION, || async { - let notify = TransientInstance::new(Notify::new()); - let (tx, mut rx) = watch::channel(None); + timeout(Duration::from_secs(5), async { + let notify = TransientInstance::new(Notify::new()); + let (tx, mut rx) = watch::channel(None); + let tx = TransientInstance::new(tx); - // create the task - let out_vc = spawns_detached(notify.clone(), TransientInstance::new(tx)); + // create the task + let out_vc = spawns_detached(notify.clone(), tx.clone()); - // see that the task does not exit yet - timeout(Duration::from_millis(100), out_vc.strongly_consistent()) - .await - .expect_err("should wait on the detached task"); + // see that the task does not exit yet + timeout(Duration::from_millis(100), out_vc.strongly_consistent()) + .await + .expect_err("should wait on the detached task"); - // let the detached future exit - notify.notify_waiters(); + // let the detached future exit + notify.notify_waiters(); - // it should send us back a cell - let detached_vc: Vc = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap(); - assert_eq!(*detached_vc.await.unwrap(), 42); + // it should send us back a cell + let detached_vc: Vc = rx.wait_for(|opt| opt.is_some()).await?.unwrap(); + assert_eq!(*detached_vc.strongly_consistent().await?, 42); - // the parent task should now be able to exit - out_vc.strongly_consistent().await.unwrap(); + // the parent task should now be able to exit + out_vc.strongly_consistent().await?; - Ok(()) + Ok(()) + }) + .await? }) .await } #[turbo_tasks::function] -fn spawns_detached( +async fn spawns_detached( notify: TransientInstance, sender: TransientInstance>>>, -) -> Vc { +) -> Vc<()> { tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move { notify.notified().await; // creating cells after the normal lifetime of the task should be okay, as the parent task @@ -50,5 +54,88 @@ fn spawns_detached( sender.send(Some(Vc::cell(42))).unwrap(); Ok(()) }))); - Completion::new() + Vc::cell(()) +} + +#[tokio::test] +async fn test_spawns_detached_changing() -> anyhow::Result<()> { + run(®ISTRATION, || async { + timeout(Duration::from_secs(5), async { + let (tx, mut rx) = watch::channel(None); + let tx = TransientInstance::new(tx); + + // state that's read by the detached future + let changing_input_detached = ChangingInput { + state: State::new(42), + } + .cell(); + + // state that's read by the outer task + let changing_input_outer = ChangingInput { + state: State::new(0), + } + .cell(); + + // create the task + let out_vc = + spawns_detached_changing(tx.clone(), changing_input_detached, changing_input_outer); + + // it should send us back a cell + let detached_vc: Vc = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap(); + assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 42); + + // the parent task should now be able to exit + out_vc.strongly_consistent().await.unwrap(); + + // changing either input should invalidate the vc and cause it to run again + changing_input_detached.await.unwrap().state.set(43); + out_vc.strongly_consistent().await.unwrap(); + assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 43); + + changing_input_outer.await.unwrap().state.set(44); + assert_eq!(*out_vc.strongly_consistent().await.unwrap(), 44); + + Ok(()) + }) + .await? + }) + .await +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::function] +async fn spawns_detached_changing( + sender: TransientInstance>>>, + changing_input_detached: Vc, + changing_input_outer: Vc, +) -> Vc { + let tt = turbo_tasks(); + tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move { + sleep(Duration::from_millis(100)).await; + // nested detached_for_testing calls should work + tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move { + sleep(Duration::from_millis(100)).await; + // creating cells after the normal lifetime of the task should be okay, as the parent + // task is waiting on us before exiting! + sender + .send(Some(Vc::cell( + *read_changing_input(changing_input_detached).await.unwrap(), + ))) + .unwrap(); + Ok(()) + }))); + Ok(()) + }))); + Vc::cell(*read_changing_input(changing_input_outer).await.unwrap()) +} + +// spawns_detached should take a dependency on this function for each input +#[turbo_tasks::function] +async fn read_changing_input(changing_input: Vc) -> Vc { + // when changing_input.set is called, it will trigger an invalidator for this task + Vc::cell(*changing_input.await.unwrap().state.get()) }