From e5af73bc505f9e6a63718582be849e331f1dc29a Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 14 Nov 2023 16:31:41 +0800 Subject: [PATCH] Added StreamExchange operator with HashShard distribution strategy to optimize data exchange between fragments. --- src/frontend/src/optimizer/plan_node/logical_source.rs | 3 ++- src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 542178a830b7..30c4ee19002a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -45,6 +45,7 @@ use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup, ToStreamContext, }; +use crate::optimizer::property::Distribution::HashShard; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; @@ -574,7 +575,7 @@ impl ToStream for LogicalSource { if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id { - plan = StreamRowIdGen::new(plan, row_id_index).into(); + plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])).into(); } Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index b7634c652a34..1f5be56477ca 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -30,6 +30,10 @@ pub struct StreamRowIdGen { impl StreamRowIdGen { pub fn new(input: PlanRef, row_id_index: usize) -> Self { + // if input.append_only() { + // return Self::new_with_dist(input, row_id_index, Distribution::HashShard(vec![row_id_index])); + // } + let distribution = input.distribution().clone(); Self::new_with_dist(input, row_id_index, distribution) }