Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): support output indices in dispatchers #8094

Merged
merged 11 commits into from
Feb 22, 2023
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 40 additions & 14 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,12 @@ enum DispatcherType {
NO_SHUFFLE = 4;
}

// The property of an edge in the fragment graph.
// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details.
message DispatchStrategy {
DispatcherType type = 1;
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
repeated uint32 output_indices = 3;
}

// A dispatcher redistribute messages.
Expand All @@ -564,7 +567,11 @@ message Dispatcher {
DispatcherType type = 1;
// Indices of the columns to be used for hashing.
// For dispatcher types other than HASH, this is ignored.
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
// Indices of the columns to output.
// In most cases, this contains all columns in the input. But for some cases like MV on MV or
// schema change, we may only output a subset of the columns.
repeated uint32 output_indices = 6;
// The hash mapping for consistent hash.
// For dispatcher types other than HASH, this is ignored.
ActorMapping hash_mapping = 3;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ pub fn to_stream_prost_body(
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &base.dist {
dist_key_indices: match &base.dist {
Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(),
_ => vec![],
},
output_indices: (0..base.schema().len() as u32).collect(),
}),
}),
Node::DynamicFilter(me) => {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ impl StreamNode for StreamExchange {
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &self.base.dist {
dist_key_indices: match &self.base.dist {
Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(),
_ => vec![],
},
output_indices: (0..self.schema().len() as u32).collect(),
}),
})
}
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,39 @@ impl StreamNode for StreamShare {
impl StreamShare {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> ProstStreamPlan {
let operator_id = self.base.id.0 as u32;
let output_indices = (0..self.schema().len() as u32).collect_vec();

match state.get_share_stream_node(operator_id) {
None => {
let dispatch_strategy = match &self.base.dist {
Distribution::HashShard(keys) | Distribution::UpstreamHashShard(keys, _) => {
DispatchStrategy {
r#type: DispatcherType::Hash as i32,
column_indices: keys.iter().map(|x| *x as u32).collect_vec(),
dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(),
output_indices,
}
}
Distribution::Single => DispatchStrategy {
r#type: DispatcherType::Simple as i32,
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
},
Distribution::Broadcast => DispatchStrategy {
r#type: DispatcherType::Broadcast as i32,
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
},
Distribution::SomeShard => {
// FIXME: use another DispatcherType?
DispatchStrategy {
r#type: DispatcherType::Hash as i32,
column_indices: self
dist_key_indices: self
.base
.logical_pk
.iter()
.map(|x| *x as u32)
.collect_vec(),
output_indices,
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ fn rewrite_stream_node(

let strategy = DispatchStrategy {
r#type: DispatcherType::NoShuffle.into(),
column_indices: vec![], // TODO: use distribution key
dist_key_indices: vec![], // TODO: use distribution key
output_indices: (0..(child_node.fields.len() as u32)).collect(),
};
Ok(StreamNode {
stream_key: child_node.stream_key.clone(),
Expand Down
43 changes: 32 additions & 11 deletions src/frontend/src/stream_fragmenter/rewrite/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ fn build_no_shuffle_exchange_for_delta_join(
fields: upstream.fields.clone(),
stream_key: upstream.stream_key.clone(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(dispatch_no_shuffle()),
strategy: Some(dispatch_no_shuffle(
(0..(upstream.fields.len() as u32)).collect(),
)),
})),
input: vec![],
append_only: upstream.append_only,
Expand All @@ -46,33 +48,41 @@ fn build_no_shuffle_exchange_for_delta_join(
fn build_consistent_hash_shuffle_exchange_for_delta_join(
state: &mut BuildFragmentGraphState,
upstream: &StreamNode,
column_indices: Vec<u32>,
dist_key_indices: Vec<u32>,
) -> StreamNode {
StreamNode {
operator_id: state.gen_operator_id() as u64,
identity: "HASH Exchange (Lookup and Merge)".into(),
fields: upstream.fields.clone(),
stream_key: upstream.stream_key.clone(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(dispatch_consistent_hash_shuffle(column_indices)),
strategy: Some(dispatch_consistent_hash_shuffle(
dist_key_indices,
(0..(upstream.fields.len() as u32)).collect(),
)),
})),
input: vec![],
append_only: upstream.append_only,
}
}

fn dispatch_no_shuffle() -> DispatchStrategy {
fn dispatch_no_shuffle(output_indices: Vec<u32>) -> DispatchStrategy {
DispatchStrategy {
r#type: DispatcherType::NoShuffle.into(),
column_indices: vec![],
dist_key_indices: vec![],
output_indices,
}
}

fn dispatch_consistent_hash_shuffle(column_indices: Vec<u32>) -> DispatchStrategy {
fn dispatch_consistent_hash_shuffle(
dist_key_indices: Vec<u32>,
output_indices: Vec<u32>,
) -> DispatchStrategy {
// Actually Hash shuffle is consistent hash shuffle now.
DispatchStrategy {
r#type: DispatcherType::Hash.into(),
column_indices,
dist_key_indices,
output_indices,
}
}

Expand Down Expand Up @@ -136,6 +146,9 @@ fn build_delta_join_inner(
let i0_length = arrange_0.fields.len();
let i1_length = arrange_1.fields.len();

let i0_output_indices = (0..i0_length as u32).collect_vec();
let i1_output_indices = (0..i1_length as u32).collect_vec();

let lookup_0_column_reordering = {
let tmp: Vec<i32> = (i1_length..i1_length + i0_length)
.chain(0..i1_length)
Expand Down Expand Up @@ -204,7 +217,7 @@ fn build_delta_join_inner(
arrange_0_frag.fragment_id,
lookup_0_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()),
link_id: exchange_a0l0.operator_id,
},
);
Expand All @@ -221,6 +234,7 @@ fn build_delta_join_inner(
.iter()
.map(|x| *x as u32)
.collect_vec(),
i0_output_indices,
),
link_id: exchange_a0l1.operator_id,
},
Expand All @@ -238,6 +252,7 @@ fn build_delta_join_inner(
.iter()
.map(|x| *x as u32)
.collect_vec(),
i1_output_indices.clone(),
),
link_id: exchange_a1l0.operator_id,
},
Expand All @@ -249,7 +264,7 @@ fn build_delta_join_inner(
arrange_1_frag.fragment_id,
lookup_1_frag.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_no_shuffle(),
dispatch_strategy: dispatch_no_shuffle(i1_output_indices),
link_id: exchange_a1l1.operator_id,
},
);
Expand All @@ -275,7 +290,10 @@ fn build_delta_join_inner(
lookup_0_frag.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
dispatch_strategy: dispatch_consistent_hash_shuffle(
node.stream_key.clone(),
(0..node.fields.len() as u32).collect(),
),
link_id: exchange_l0m.operator_id,
},
);
Expand All @@ -284,7 +302,10 @@ fn build_delta_join_inner(
lookup_1_frag.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()),
dispatch_strategy: dispatch_consistent_hash_shuffle(
node.stream_key.clone(),
(0..node.fields.len() as u32).collect(),
),
link_id: exchange_l1m.operator_id,
},
);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/utils/stream_graph_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl StreamGraphFormatter {
"StreamExchange {} from {}",
match dist.r#type() {
DispatcherType::Unspecified => unreachable!(),
DispatcherType::Hash => format!("Hash({:?})", dist.column_indices),
DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
DispatcherType::Broadcast => "Broadcast".to_string(),
DispatcherType::Simple => "Single".to_string(),
DispatcherType::NoShuffle => "NoShuffle".to_string(),
Expand Down
Loading