Skip to content

Commit

Permalink
add cell/output reading and creating cached tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 7, 2024
1 parent a26cc54 commit c7b49b3
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 83 deletions.
223 changes: 153 additions & 70 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use self::{
};
use crate::{
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, InProgressState,
OutputValue,
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef,
InProgressState, OutputValue,
},
get, remove,
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc},
Expand Down Expand Up @@ -131,7 +131,7 @@ impl TurboTasksBackend {
snapshot_request
.suspended_operations
.insert(operation.clone().into());
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel);
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
if value == SNAPSHOT_REQUESTED_BIT {
self.operations_suspended.notify_all();
Expand All @@ -147,11 +147,46 @@ impl TurboTasksBackend {
}
}
}

pub(crate) fn start_operation(&self) -> OperationGuard<'_> {
let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
let mut snapshot_request = self.snapshot_request.lock();
if snapshot_request.snapshot_requested {
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
if value == SNAPSHOT_REQUESTED_BIT {
self.operations_suspended.notify_all();
}
self.snapshot_completed
.wait_while(&mut snapshot_request, |snapshot_request| {
snapshot_request.snapshot_requested
});
self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
}
}
OperationGuard { backend: self }
}
}

pub(crate) struct OperationGuard<'a> {
backend: &'a TurboTasksBackend,
}

impl<'a> Drop for OperationGuard<'a> {
fn drop(&mut self) {
let fetch_sub = self
.backend
.in_progress_operations
.fetch_sub(1, Ordering::AcqRel);
if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
self.backend.operations_suspended.notify_all();
}
}
}

// Operations
impl TurboTasksBackend {
pub fn connect_child(
fn connect_child(
&self,
parent_task: TaskId,
child_task: TaskId,
Expand All @@ -164,23 +199,95 @@ impl TurboTasksBackend {
);
}

pub fn update_cell(
fn try_read_task_output(
&self,
task_id: TaskId,
cell: CellId,
content: CellContent,
reader: Option<TaskId>,
strongy_consistent: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
operation::UpdateCellOperation::run(
task_id,
cell,
content,
self.execute_context(turbo_tasks),
);
) -> Result<Result<RawVc, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);

if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event, .. }
| InProgressState::InProgress { done_event, .. } => {
let listener = done_event.listen();
return Ok(Err(listener));
}
}
}

if strongy_consistent {
todo!("Handle strongly consistent read: {task:#?}");
}

if let Some(output) = get!(task, Output) {
let result = match output {
OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))),
OutputValue::Output(task) => Some(Ok(Ok(RawVc::TaskOutput(*task)))),
OutputValue::Error | OutputValue::Panic => {
if let Some(error) = get!(task, Error) {
Some(Err(error.clone().into()))
} else {
None
}
}
};
if let Some(result) = result {
if let Some(reader) = reader {
task.add(CachedDataItem::OutputDependent {
task: reader,
value: (),
});
drop(task);

let mut reader_task = ctx.task(reader);
reader_task.add(CachedDataItem::OutputDependency {
target: task_id,
value: (),
});
}

return result;
}
}

todo!("Output of is not available, recompute task: {task:#?}");
}

pub fn invalidate(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
operation::InvalidateOperation::run(smallvec![task_id], self.execute_context(turbo_tasks));
fn try_read_task_cell(
&self,
task_id: TaskId,
reader: Option<TaskId>,
cell: CellId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
if let Some(content) = get!(task, CellData { cell }) {
let content = content.clone();
if let Some(reader) = reader {
task.add(CachedDataItem::CellDependent {
cell,
task: reader,
value: (),
});
drop(task);
let mut reader_task = ctx.task(reader);
reader_task.add(CachedDataItem::CellDependency {
target: CellRef {
task: task_id,
cell,
},
value: (),
});
}
return Ok(Ok(CellContent(Some(content)).into_typed(cell.type_id)));
}

todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}");
}
}

Expand Down Expand Up @@ -243,7 +350,7 @@ impl Backend for TurboTasksBackend {
}

fn invalidate_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
self.invalidate(task_id, turbo_tasks);
operation::InvalidateOperation::run(smallvec![task_id], self.execute_context(turbo_tasks));
}

fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
Expand Down Expand Up @@ -292,12 +399,16 @@ impl Backend for TurboTasksBackend {
{
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let Some(InProgressState::Scheduled {
let Some(in_progress) = remove!(task, InProgress) else {
return None;
};
let InProgressState::Scheduled {
clean,
done_event,
start_event,
}) = remove!(task, InProgress)
} = in_progress
else {
task.add(CachedDataItem::InProgress { value: in_progress });
return None;
};
task.add(CachedDataItem::InProgress {
Expand Down Expand Up @@ -405,15 +516,15 @@ impl Backend for TurboTasksBackend {
let Some(CachedDataItemValue::InProgress { value: in_progress }) =
task.remove(&CachedDataItemKey::InProgress {})
else {
panic!("Task execution completed, but task is not in progress");
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
let InProgressState::InProgress {
done_event,
clean,

Check warning on line 523 in turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

unused variable: `clean`
stale,
} = in_progress
else {
panic!("Task execution completed, but task is not in progress");
panic!("Task execution completed, but task is not in progress: {task:#?}");
};

// TODO handle cell counters
Expand All @@ -440,60 +551,34 @@ impl Backend for TurboTasksBackend {
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
todo!()
}

fn try_read_task_output(
&self,
_: TaskId,
_: TaskId,
_: bool,
_: &dyn TurboTasksBackendApi<Self>,
task_id: TaskId,
reader: TaskId,
strongy_consistent: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<RawVc, EventListener>> {
todo!()
self.try_read_task_output(task_id, Some(reader), strongy_consistent, turbo_tasks)
}

fn try_read_task_output_untracked(
&self,
task_id: TaskId,
strongy_consistent: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<RawVc, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);

if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event, .. }
| InProgressState::InProgress { done_event, .. } => {
let listener = done_event.listen();
return Ok(Err(listener));
}
}
}

if strongy_consistent {
todo!("Handle strongly consistent read");
}

if let Some(output) = get!(task, Output) {
match output {
OutputValue::Cell(cell) => return Ok(Ok(RawVc::TaskCell(cell.task, cell.cell))),
OutputValue::Output(task) => return Ok(Ok(RawVc::TaskOutput(*task))),
OutputValue::Error | OutputValue::Panic => {
if let Some(error) = get!(task, Error) {
return Err(error.clone().into());
}
}
}
}

todo!("Output is not available, recompute task");
self.try_read_task_output(task_id, None, strongy_consistent, turbo_tasks)
}

fn try_read_task_cell(
&self,
_: TaskId,
_: CellId,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
task_id: TaskId,
cell: CellId,
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>> {
todo!()
self.try_read_task_cell(task_id, Some(reader), cell, turbo_tasks)
}

fn try_read_task_cell_untracked(
Expand All @@ -502,14 +587,7 @@ impl Backend for TurboTasksBackend {
cell: CellId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);
if let Some(content) = get!(task, CellData { cell }) {
return Ok(Ok(
CellContent(Some(content.clone())).into_typed(cell.type_id)
));
}
todo!("Cell {cell:?} from {task_id:?} is not available, recompute task or error");
self.try_read_task_cell(task_id, None, cell, turbo_tasks)
}

fn try_read_own_task_cell_untracked(
Expand Down Expand Up @@ -565,7 +643,12 @@ impl Backend for TurboTasksBackend {
content: CellContent,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.update_cell(task_id, cell, content, turbo_tasks);
operation::UpdateCellOperation::run(
task_id,
cell,
content,
self.execute_context(turbo_tasks),
);
}

fn connect_task(&self, _: TaskId, _: TaskId, _: &dyn TurboTasksBackendApi<Self>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::data::CachedDataItem;

#[derive(Serialize, Deserialize, Clone, Default)]
pub enum ConnectChildOperation {
Todo,
ScheduleTask {
task_id: TaskId,
},
#[default]
Done,
// TODO Add aggregated edge
Expand All @@ -20,19 +22,28 @@ impl ConnectChildOperation {
value: (),
}) {
// TODO add aggregated edge
ConnectChildOperation::Todo.execute(ctx);
// TODO check for active
ConnectChildOperation::ScheduleTask {
task_id: child_task,
}
.execute(&ctx);
}
}
}

impl Operation for ConnectChildOperation {
fn execute(mut self, ctx: ExecuteContext<'_>) {
fn execute(mut self, ctx: &ExecuteContext<'_>) {

Check warning on line 35 in turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs

View workflow job for this annotation

GitHub Actions / test cargo unit / build

variable does not need to be mutable
loop {
ctx.operation_suspend_point(&self);
match self {
ConnectChildOperation::Todo => {
self = ConnectChildOperation::Done;
continue;
ConnectChildOperation::ScheduleTask { task_id } => {
{
let mut task = ctx.task(task_id);
task.add(CachedDataItem::new_scheduled(task_id));
}
ctx.schedule(task_id);

return;
}
ConnectChildOperation::Done => {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub enum InvalidateOperation {

impl InvalidateOperation {
pub fn run(task_ids: SmallVec<[TaskId; 4]>, ctx: ExecuteContext<'_>) {
InvalidateOperation::MakeDirty { task_ids }.execute(ctx)
InvalidateOperation::MakeDirty { task_ids }.execute(&ctx)
}
}

impl Operation for InvalidateOperation {
fn execute(self, ctx: ExecuteContext<'_>) {
fn execute(self, ctx: &ExecuteContext<'_>) {
loop {
ctx.operation_suspend_point(&self);
match self {
Expand Down
Loading

0 comments on commit c7b49b3

Please sign in to comment.