Skip to content

Commit

Permalink
refactor(rust): Fix topological sort in new streaming engine (pola-rs…
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Sep 18, 2024
1 parent 4f5c95d commit 536a2b9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
30 changes: 14 additions & 16 deletions crates/polars-stream/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ fn run_subgraph(
for (input, input_pipe) in node.inputs.iter().zip(input_pipes.drain(..)) {
if let Some(pipe) = input_pipe {
physical_pipes.insert(*input, pipe);

// For all the receive ports we just initialized inside spawn(), decrement
// the num_send_ports_not_yet_ready for the node it was connected to and mark
// the node as ready to spawn if all its send ports are connected to
// initialized recv ports.
let sender = graph.pipes[*input].sender;
if let Some(count) = num_send_ports_not_yet_ready.get_mut(sender) {
if *count > 0 {
*count -= 1;
if *count == 0 {
ready.push(sender);
}
}
}
}
}
for (output, output_pipe) in node.outputs.iter().zip(output_pipes.drain(..)) {
Expand All @@ -197,22 +211,6 @@ fn run_subgraph(
// Reuse the pipe vectors, clearing the borrow it has for next iteration.
input_pipes = reuse_vec(input_pipes);
output_pipes = reuse_vec(output_pipes);

// For all the receive ports we just initialized inside spawn(), decrement
// the num_send_ports_not_yet_ready for the node it was connected to and mark
// the node as ready to spawn if all its send ports are connected to
// initialized recv ports.
for input in &node.inputs {
let sender = graph.pipes[*input].sender;
if let Some(count) = num_send_ports_not_yet_ready.get_mut(sender) {
if *count > 0 {
*count -= 1;
if *count == 0 {
ready.push(sender);
}
}
}
}
}

// Spawn tasks for all the physical pipes (no-op on most, but needed for
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl PhysicalPipe {
pub fn send_port(&mut self) -> SendPort<'_> {
assert!(
matches!(self, Self::SerialReceiver(..) | Self::ParallelReceiver(..)),
"PhysicalPipe::send_port must be called on a pipe which only has its send port initialized"
"PhysicalPipe::send_port must be called on a pipe which only has its receive port initialized"
);
SendPort(self)
}
Expand Down

0 comments on commit 536a2b9

Please sign in to comment.