Skip to content

Commit

Permalink
Added StreamExchange operator with HashShard distribution strategy to…
Browse files Browse the repository at this point in the history
… optimize data exchange between fragments.
  • Loading branch information
shanicky committed Nov 14, 2023
1 parent aeb2ace commit e5af73b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
ToStreamContext,
};
use crate::optimizer::property::Distribution::HashShard;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{ColIndexMapping, Condition, IndexRewriter};

Expand Down Expand Up @@ -574,7 +575,7 @@ impl ToStream for LogicalSource {
if let Some(row_id_index) = self.core.row_id_index
&& self.core.gen_row_id
{
plan = StreamRowIdGen::new(plan, row_id_index).into();
plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])).into();
}
Ok(plan)
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub struct StreamRowIdGen {

impl StreamRowIdGen {
pub fn new(input: PlanRef, row_id_index: usize) -> Self {
// if input.append_only() {
// return Self::new_with_dist(input, row_id_index, Distribution::HashShard(vec![row_id_index]));
// }

let distribution = input.distribution().clone();
Self::new_with_dist(input, row_id_index, distribution)
}
Expand Down

0 comments on commit e5af73b

Please sign in to comment.