Skip to content

Commit

Permalink
Add stubs for RawVc::TaskOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
bgw committed Aug 29, 2024
1 parent 7ae6813 commit bcd07cf
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 49 deletions.
20 changes: 19 additions & 1 deletion turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use turbo_tasks::{
registry,
test_helpers::with_turbo_tasks_for_testing,
util::{SharedError, StaticOrArc},
CellId, ExecutionId, InvalidationReason, MagicAny, RawVc, ReadConsistency, TaskId,
CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadConsistency, TaskId,
TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
};

Expand Down Expand Up @@ -242,6 +242,24 @@ impl TurboTasksApi for VcStorage {
self.read_own_task_cell(current_task, index)
}

fn try_read_local_output(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency)
}

fn try_read_local_output_untracked(
&self,
_parent_task_id: TaskId,
_local_task_id: LocalTaskId,
_consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
unimplemented!()
}

fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
unimplemented!()
}
Expand Down
60 changes: 22 additions & 38 deletions turbopack/crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@ use serde::{de::Visitor, Deserialize, Serialize};
use crate::{registry, TaskPersistence};

macro_rules! define_id {
($name:ident : $primitive:ty $(,derive($($derive:ty),*))?) => {
(
$name:ident : $primitive:ty
$(,derive($($derive:ty),*))?
$(,serde($serde:tt))?
$(,doc = $doc:literal)*
$(,)?
) => {
$(#[doc = $doc])*
#[derive(Hash, Clone, Copy, PartialEq, Eq, PartialOrd, Ord $($(,$derive)*)? )]
$(#[serde($serde)])?
pub struct $name {
id: NonZero<$primitive>,
}
Expand Down Expand Up @@ -64,13 +72,24 @@ macro_rules! define_id {
};
}

define_id!(TaskId: u32);
define_id!(TaskId: u32, derive(Serialize, Deserialize), serde(transparent));
define_id!(FunctionId: u32);
define_id!(ValueTypeId: u32);
define_id!(TraitTypeId: u32);
define_id!(BackendJobId: u32);
define_id!(ExecutionId: u64, derive(Debug));
define_id!(LocalCellId: u32, derive(Debug));
define_id!(
LocalCellId: u32,
derive(Debug),
doc = "Represents the nth call to `Vc::cell()` with `local_cells` inside of the parent ",
doc = "non-local task.",
);
define_id!(
LocalTaskId: u32,
derive(Debug, Serialize, Deserialize),
serde(transparent),
doc = "Represents the nth `local_cells` function call inside a task.",
);

impl Debug for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -160,38 +179,3 @@ make_serializable!(
registry::get_trait_type_id_by_global_name,
TraitTypeVisitor
);

impl Serialize for TaskId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_u32(**self)
}
}

impl<'de> Deserialize<'de> for TaskId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;

impl Visitor<'_> for V {
type Value = TaskId;

fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "task id")
}

fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(TaskId::from(v))
}
}

deserializer.deserialize_u32(V)
}
}
4 changes: 3 additions & 1 deletion turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ use auto_hash_map::AutoSet;
pub use collectibles::CollectiblesSource;
pub use completion::{Completion, Completions};
pub use display::ValueToString;
pub use id::{ExecutionId, FunctionId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT};
pub use id::{
ExecutionId, FunctionId, LocalTaskId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT,
};
pub use invalidation::{
get_invalidator, DynamicEqHash, InvalidationReason, InvalidationReasonKind,
InvalidationReasonSet, Invalidator,
Expand Down
50 changes: 49 additions & 1 deletion turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
},
capture_future::{self, CaptureFuture},
event::{Event, EventListener},
id::{BackendJobId, ExecutionId, FunctionId, LocalCellId, TraitTypeId, TRANSIENT_TASK_BIT},
id::{
BackendJobId, ExecutionId, FunctionId, LocalCellId, LocalTaskId, TraitTypeId,
TRANSIENT_TASK_BIT,
},
id_factory::{IdFactory, IdFactoryWithReuse},
magic_any::MagicAny,
raw_vc::{CellId, RawVc},
Expand Down Expand Up @@ -144,6 +147,22 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
index: CellId,
) -> Result<Result<TypedCellContent, EventListener>>;

fn try_read_local_output(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>>;

/// INVALIDATION: Be careful with this, it will not track dependencies, so
/// using it could break cache invalidation.
fn try_read_local_output_untracked(
&self,
parent_task_id: TaskId,
local_task_id: LocalTaskId,
consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>>;

fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap;

fn emit_collectible(&self, trait_type: TraitTypeId, collectible: RawVc);
Expand Down Expand Up @@ -1294,6 +1313,26 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
.try_read_own_task_cell_untracked(current_task, index, self)
}

fn try_read_local_output(
&self,
_parent_task_id: TaskId,
_local_task_id: LocalTaskId,
_consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
todo!("bgw: local outputs");
}

/// INVALIDATION: Be careful with this, it will not track dependencies, so
/// using it could break cache invalidation.
fn try_read_local_output_untracked(
&self,
_parent_task_id: TaskId,
_local_task_id: LocalTaskId,
_consistency: ReadConsistency,
) -> Result<Result<RawVc, EventListener>> {
todo!("bgw: local outputs");
}

fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap {
self.backend.read_task_collectibles(
task,
Expand Down Expand Up @@ -1952,6 +1991,15 @@ pub(crate) fn read_local_cell(
})
}

pub(crate) async fn read_local_output(
_this: &dyn TurboTasksApi,
_task_id: TaskId,
_local_output_id: LocalTaskId,
_consistency: ReadConsistency,
) -> Result<RawVc> {
todo!("bgw: local outputs");
}

/// Panics if the [`ExecutionId`] does not match the current task's
/// `execution_id`.
pub(crate) fn assert_execution_id(execution_id: ExecutionId) {
Expand Down
56 changes: 48 additions & 8 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use thiserror::Error;
use crate::{
backend::{CellContent, TypedCellContent},
event::EventListener,
id::{ExecutionId, LocalCellId},
id::{ExecutionId, LocalCellId, LocalTaskId},
manager::{
assert_execution_id, current_task, read_local_cell, read_task_cell, read_task_output,
TurboTasksApi,
assert_execution_id, current_task, read_local_cell, read_local_output, read_task_cell,
read_task_output, TurboTasksApi,
},
registry::{self, get_value_type},
turbo_tasks, CollectiblesSource, ReadConsistency, TaskId, TraitTypeId, ValueType, ValueTypeId,
Expand Down Expand Up @@ -58,6 +58,7 @@ impl Display for CellId {
pub enum RawVc {
TaskOutput(TaskId),
TaskCell(TaskId, CellId),
LocalOutput(TaskId, LocalTaskId),
#[serde(skip)]
LocalCell(ExecutionId, LocalCellId),
}
Expand All @@ -67,6 +68,7 @@ impl RawVc {
match self {
RawVc::TaskOutput(_) => false,
RawVc::TaskCell(_, _) => true,
RawVc::LocalOutput(_, _) => false,
RawVc::LocalCell(_, _) => false,
}
}
Expand All @@ -75,6 +77,7 @@ impl RawVc {
match self {
RawVc::TaskOutput(_) => false,
RawVc::TaskCell(_, _) => false,
RawVc::LocalOutput(_, _) => true,
RawVc::LocalCell(_, _) => true,
}
}
Expand Down Expand Up @@ -162,6 +165,12 @@ impl RawVc {
return Err(ResolveTypeError::NoContent);
}
}
RawVc::LocalOutput(task_id, local_cell_id) => {
current =
read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual)
.await
.map_err(|source| ResolveTypeError::TaskError { source })?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
return Ok(
Expand Down Expand Up @@ -193,16 +202,23 @@ impl RawVc {
let tt = turbo_tasks();
let mut current = self;
let mut notified = false;
let mut lazily_notify = || {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
};
loop {
match current {
RawVc::TaskOutput(task) => {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
lazily_notify();
current = read_task_output(&*tt, task, consistency).await?;
}
RawVc::TaskCell(_, _) => return Ok(current),
RawVc::LocalOutput(task_id, local_cell_id) => {
lazily_notify();
current = read_local_output(&*tt, task_id, local_cell_id, consistency).await?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
let value_type = get_value_type(shared_reference.0);
Expand All @@ -219,7 +235,7 @@ impl RawVc {

pub fn get_task_id(&self) -> TaskId {
match self {
RawVc::TaskOutput(t) | RawVc::TaskCell(t, _) => *t,
RawVc::TaskOutput(t) | RawVc::TaskCell(t, _) | RawVc::LocalOutput(t, _) => *t,
RawVc::LocalCell(execution_id, _) => {
assert_execution_id(*execution_id);
current_task("RawVc::get_task_id")
Expand Down Expand Up @@ -367,6 +383,30 @@ impl Future for ReadRawVcFuture {
Err(err) => return Poll::Ready(Err(err)),
}
}
RawVc::LocalOutput(task_id, local_output_id) => {
let read_result = if this.untracked {
this.turbo_tasks.try_read_local_output_untracked(
task_id,
local_output_id,
this.consistency,
)
} else {
this.turbo_tasks.try_read_local_output(
task_id,
local_output_id,
this.consistency,
)
};
match read_result {
Ok(Ok(vc)) => {
this.consistency = ReadConsistency::Eventual;
this.current = vc;
continue 'outer;
}
Ok(Err(listener)) => listener,
Err(err) => return Poll::Ready(Err(err)),
}
}
RawVc::LocalCell(execution_id, local_cell_id) => {
return Poll::Ready(Ok(read_local_cell(execution_id, local_cell_id).into()));
}
Expand Down

0 comments on commit bcd07cf

Please sign in to comment.