From e996eb7ebd1650c40d71082a4160a28dec796875 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 15:48:52 +0800 Subject: [PATCH 1/9] rename to dist key indices Signed-off-by: Bugen Zhao --- proto/stream_plan.proto | 4 ++-- src/frontend/src/optimizer/plan_node/stream.rs | 2 +- .../src/optimizer/plan_node/stream_exchange.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_share.rs | 8 ++++---- src/frontend/src/stream_fragmenter/mod.rs | 2 +- .../src/stream_fragmenter/rewrite/delta_join.rs | 10 +++++----- src/frontend/src/utils/stream_graph_formatter.rs | 2 +- src/meta/src/stream/stream_graph/actor.rs | 8 ++++---- src/meta/src/stream/test_fragmenter.rs | 6 +++--- src/stream/src/executor/dispatch.rs | 6 +++--- 10 files changed, 25 insertions(+), 25 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index bd60dcd19ffce..de24699e9291e 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -555,7 +555,7 @@ enum DispatcherType { message DispatchStrategy { DispatcherType type = 1; - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; } // A dispatcher redistribute messages. @@ -564,7 +564,7 @@ message Dispatcher { DispatcherType type = 1; // Indices of the columns to be used for hashing. // For dispatcher types other than HASH, this is ignored. - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; // The hash mapping for consistent hash. // For dispatcher types other than HASH, this is ignored. ActorMapping hash_mapping = 3; diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index afa2bd78607ed..80d8acc3ff7d8 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -425,7 +425,7 @@ pub fn to_stream_prost_body( Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &base.dist { + dist_key_indices: match &base.dist { Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), _ => vec![], }, diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index e8f5d36cf53ed..9e9a6a89ed97d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -83,7 +83,7 @@ impl StreamNode for StreamExchange { Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &self.base.dist { + dist_key_indices: match &self.base.dist { Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(), _ => vec![], }, diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 66361b3b89174..f39ebf82d81c7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -91,22 +91,22 @@ impl StreamShare { Distribution::HashShard(keys) | Distribution::UpstreamHashShard(keys, _) => { DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: keys.iter().map(|x| *x as u32).collect_vec(), + dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(), } } Distribution::Single => DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], }, Distribution::Broadcast => DispatchStrategy { r#type: DispatcherType::Broadcast as i32, - column_indices: vec![], + dist_key_indices: vec![], }, Distribution::SomeShard => { // FIXME: use another DispatcherType? DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: self + dist_key_indices: self .base .logical_pk .iter() diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 35eae9dc34d77..333dc79e0b377 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -139,7 +139,7 @@ fn rewrite_stream_node( let strategy = DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], // TODO: use distribution key + dist_key_indices: vec![], // TODO: use distribution key }; Ok(StreamNode { stream_key: child_node.stream_key.clone(), diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index fec3424c2ac00..334c2d9564973 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -46,7 +46,7 @@ fn build_no_shuffle_exchange_for_delta_join( fn build_consistent_hash_shuffle_exchange_for_delta_join( state: &mut BuildFragmentGraphState, upstream: &StreamNode, - column_indices: Vec, + dist_key_indices: Vec, ) -> StreamNode { StreamNode { operator_id: state.gen_operator_id() as u64, @@ -54,7 +54,7 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_consistent_hash_shuffle(column_indices)), + strategy: Some(dispatch_consistent_hash_shuffle(dist_key_indices)), })), input: vec![], append_only: upstream.append_only, @@ -64,15 +64,15 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( fn dispatch_no_shuffle() -> DispatchStrategy { DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], + dist_key_indices: vec![], } } -fn dispatch_consistent_hash_shuffle(column_indices: Vec) -> DispatchStrategy { +fn dispatch_consistent_hash_shuffle(dist_key_indices: Vec) -> DispatchStrategy { // Actually Hash shuffle is consistent hash shuffle now. DispatchStrategy { r#type: DispatcherType::Hash.into(), - column_indices, + dist_key_indices, } } diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 017c76593a740..54c8140ae4d8e 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -129,7 +129,7 @@ impl StreamGraphFormatter { "StreamExchange {} from {}", match dist.r#type() { DispatcherType::Unspecified => unreachable!(), - DispatcherType::Hash => format!("Hash({:?})", dist.column_indices), + DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices), DispatcherType::Broadcast => "Broadcast".to_string(), DispatcherType::Simple => "Single".to_string(), DispatcherType::NoShuffle => "NoShuffle".to_string(), diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index ebcfcbaee8bc8..fae13519ad5dd 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -339,14 +339,14 @@ impl ActorGraphBuildStateInner { /// Create a new hash dispatcher. fn new_hash_dispatcher( - column_indices: &[u32], + dist_key_indices: &[u32], downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], downstream_actor_mapping: ActorMapping, ) -> Dispatcher { Dispatcher { r#type: DispatcherType::Hash as _, - column_indices: column_indices.to_vec(), + dist_key_indices: dist_key_indices.to_vec(), hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -362,7 +362,7 @@ impl ActorGraphBuildStateInner { assert_ne!(dispatcher_type, DispatcherType::Hash); Dispatcher { r#type: dispatcher_type as _, - column_indices: Vec::new(), + dist_key_indices: Vec::new(), hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -474,7 +474,7 @@ impl ActorGraphBuildStateInner { .to_actor(&downstream_locations); Self::new_hash_dispatcher( - &edge.dispatch_strategy.column_indices, + &edge.dispatch_strategy.dist_key_indices, downstream.fragment_id, downstream.actor_ids, actor_mapping, diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 9dca3500cf8b9..b07e4c7da5e33 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -213,7 +213,7 @@ fn make_stream_fragments() -> Vec { node_body: Some(NodeBody::Exchange(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], }), })), fields: vec![ @@ -374,7 +374,7 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], }), link_id: 4, upstream_id: 1, @@ -383,7 +383,7 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], }), link_id: 1, upstream_id: 2, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index b89c9fa9f6409..4dea7276513c4 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -309,8 +309,8 @@ impl DispatcherImpl { let dispatcher_impl = match dispatcher.get_type()? { Hash => { assert!(!outputs.is_empty()); - let column_indices = dispatcher - .column_indices + let dist_key_indices = dispatcher + .dist_key_indices .iter() .map(|i| *i as usize) .collect(); @@ -320,7 +320,7 @@ impl DispatcherImpl { DispatcherImpl::Hash(HashDataDispatcher::new( outputs, - column_indices, + dist_key_indices, hash_mapping, dispatcher.dispatcher_id, )) From 4ab022ec1f347b4ff43a7bf905a5abf266decb5a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 16:00:22 +0800 Subject: [PATCH 2/9] create dispatcher with strategy Signed-off-by: Bugen Zhao --- src/meta/src/stream/stream_graph/actor.rs | 31 ++++++++++++++++------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index fae13519ad5dd..687cc89b6761d 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -25,7 +25,9 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; -use risingwave_pb::stream_plan::{Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode}; +use risingwave_pb::stream_plan::{ + DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, +}; use super::id::GlobalFragmentIdsExt; use super::Locations; @@ -339,14 +341,16 @@ impl ActorGraphBuildStateInner { /// Create a new hash dispatcher. fn new_hash_dispatcher( - dist_key_indices: &[u32], + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], downstream_actor_mapping: ActorMapping, ) -> Dispatcher { + assert_eq!(strategy.r#type(), DispatcherType::Hash); + Dispatcher { r#type: DispatcherType::Hash as _, - dist_key_indices: dist_key_indices.to_vec(), + dist_key_indices: strategy.dist_key_indices.clone(), hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -355,13 +359,14 @@ impl ActorGraphBuildStateInner { /// Create a new dispatcher for non-hash types. fn new_normal_dispatcher( - dispatcher_type: DispatcherType, + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], ) -> Dispatcher { - assert_ne!(dispatcher_type, DispatcherType::Hash); + assert_ne!(strategy.r#type(), DispatcherType::Hash); + Dispatcher { - r#type: dispatcher_type as _, + r#type: strategy.r#type, dist_key_indices: Vec::new(), hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, @@ -441,7 +446,11 @@ impl ActorGraphBuildStateInner { // Create a new dispatcher just between these two actors. self.add_dispatcher( *upstream_id, - Self::new_normal_dispatcher(dt, downstream.fragment_id, &[*downstream_id]), + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + &[*downstream_id], + ), ); // Also record the upstream for the downstream actor. @@ -474,13 +483,17 @@ impl ActorGraphBuildStateInner { .to_actor(&downstream_locations); Self::new_hash_dispatcher( - &edge.dispatch_strategy.dist_key_indices, + &edge.dispatch_strategy, downstream.fragment_id, downstream.actor_ids, actor_mapping, ) } else { - Self::new_normal_dispatcher(dt, downstream.fragment_id, downstream.actor_ids) + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + downstream.actor_ids, + ) }; for upstream_id in upstream.actor_ids { self.add_dispatcher(*upstream_id, dispatcher.clone()); From 670ef5103a7c5b19655a81f88140adb2faa81e3f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 16:26:12 +0800 Subject: [PATCH 3/9] default to empty output indices Signed-off-by: Bugen Zhao --- proto/stream_plan.proto | 3 +++ src/frontend/src/optimizer/plan_node/stream.rs | 1 + src/frontend/src/optimizer/plan_node/stream_exchange.rs | 1 + src/frontend/src/optimizer/plan_node/stream_share.rs | 4 ++++ src/frontend/src/stream_fragmenter/mod.rs | 1 + src/frontend/src/stream_fragmenter/rewrite/delta_join.rs | 2 ++ src/meta/src/stream/stream_graph/actor.rs | 5 ++++- src/meta/src/stream/test_fragmenter.rs | 3 +++ 8 files changed, 19 insertions(+), 1 deletion(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index de24699e9291e..b53cac6808123 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -556,6 +556,7 @@ enum DispatcherType { message DispatchStrategy { DispatcherType type = 1; repeated uint32 dist_key_indices = 2; + repeated uint32 output_indices = 3; } // A dispatcher redistribute messages. @@ -565,6 +566,8 @@ message Dispatcher { // Indices of the columns to be used for hashing. // For dispatcher types other than HASH, this is ignored. repeated uint32 dist_key_indices = 2; + + repeated uint32 output_indices = 6; // The hash mapping for consistent hash. // For dispatcher types other than HASH, this is ignored. ActorMapping hash_mapping = 3; diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 80d8acc3ff7d8..c6df4f486fb44 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -429,6 +429,7 @@ pub fn to_stream_prost_body( Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), _ => vec![], }, + output_indices: vec![], }), }), Node::DynamicFilter(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 9e9a6a89ed97d..790fc7fca2b08 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -87,6 +87,7 @@ impl StreamNode for StreamExchange { Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(), _ => vec![], }, + output_indices: vec![], }), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index f39ebf82d81c7..9b19e5657d24b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -92,15 +92,18 @@ impl StreamShare { DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(), + output_indices: vec![], } } Distribution::Single => DispatchStrategy { r#type: DispatcherType::Simple as i32, dist_key_indices: vec![], + output_indices: vec![], }, Distribution::Broadcast => DispatchStrategy { r#type: DispatcherType::Broadcast as i32, dist_key_indices: vec![], + output_indices: vec![], }, Distribution::SomeShard => { // FIXME: use another DispatcherType? @@ -112,6 +115,7 @@ impl StreamShare { .iter() .map(|x| *x as u32) .collect_vec(), + output_indices: vec![], } } }; diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 333dc79e0b377..628ba0d486f25 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -140,6 +140,7 @@ fn rewrite_stream_node( let strategy = DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), dist_key_indices: vec![], // TODO: use distribution key + output_indices: (0..(child_node.fields.len() as u32)).collect(), }; Ok(StreamNode { stream_key: child_node.stream_key.clone(), diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index 334c2d9564973..f7372eb905380 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -65,6 +65,7 @@ fn dispatch_no_shuffle() -> DispatchStrategy { DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), dist_key_indices: vec![], + output_indices: vec![], } } @@ -73,6 +74,7 @@ fn dispatch_consistent_hash_shuffle(dist_key_indices: Vec) -> DispatchStrat DispatchStrategy { r#type: DispatcherType::Hash.into(), dist_key_indices, + output_indices: vec![], } } diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 687cc89b6761d..41315fb78060f 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -351,6 +351,7 @@ impl ActorGraphBuildStateInner { Dispatcher { r#type: DispatcherType::Hash as _, dist_key_indices: strategy.dist_key_indices.clone(), + output_indices: strategy.output_indices.clone(), hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -364,10 +365,12 @@ impl ActorGraphBuildStateInner { downstream_actors: &[GlobalActorId], ) -> Dispatcher { assert_ne!(strategy.r#type(), DispatcherType::Hash); + assert!(strategy.dist_key_indices.is_empty()); Dispatcher { r#type: strategy.r#type, - dist_key_indices: Vec::new(), + dist_key_indices: vec![], + output_indices: strategy.output_indices.clone(), hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index b07e4c7da5e33..f1da09e25beee 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -214,6 +214,7 @@ fn make_stream_fragments() -> Vec { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], + output_indices: vec![], }), })), fields: vec![ @@ -375,6 +376,7 @@ fn make_fragment_edges() -> Vec { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as i32, dist_key_indices: vec![], + output_indices: vec![], }), link_id: 4, upstream_id: 1, @@ -384,6 +386,7 @@ fn make_fragment_edges() -> Vec { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], + output_indices: vec![], }), link_id: 1, upstream_id: 2, From b6062ad31ab3b1cf559eea8f6fa43f8a9935c1e8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 16:47:47 +0800 Subject: [PATCH 4/9] minor refactor to output Signed-off-by: Bugen Zhao --- Cargo.lock | 1 + src/stream/Cargo.toml | 1 + src/stream/src/executor/exchange/output.rs | 25 ++++++++-------------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b87392d090e6..4754aa189a4e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6403,6 +6403,7 @@ dependencies = [ "async-trait", "async_stack_trace", "bytes", + "derivative", "dyn-clone", "either", "enum-as-inner", diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 2c92d2566066f..baca2796e6198 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -21,6 +21,7 @@ async-stream = "0.3" async-trait = "0.1" async_stack_trace = { path = "../utils/async_stack_trace" } bytes = "1" +derivative = "2" dyn-clone = "1" either = "1" enum-as-inner = "0.5" diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index c2954d6cd2dd1..737defdae9ada 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use anyhow::anyhow; use async_stack_trace::{SpanValue, StackTrace}; use async_trait::async_trait; +use derivative::Derivative; use risingwave_common::util::addr::is_local_address; use tokio::sync::mpsc::error::SendError; @@ -44,22 +45,18 @@ pub trait Output: Debug + Send + Sync + 'static { pub type BoxedOutput = Box; /// `LocalOutput` sends data to a local channel. +#[derive(Derivative)] +#[derivative(Debug)] pub struct LocalOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for LocalOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LocalOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl LocalOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self { @@ -97,22 +94,18 @@ impl Output for LocalOutput { /// /// [`ExchangeService`]: risingwave_pb::task_service::exchange_service_server::ExchangeService // FIXME: can we just use the same `Output` with local and compacts it in gRPC server? +#[derive(Derivative)] +#[derivative(Debug)] pub struct RemoteOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for RemoteOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RemoteOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl RemoteOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self { From b6e656e60e34caecf6839cc54da730b38d4f9a64 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 17:06:46 +0800 Subject: [PATCH 5/9] default to all indices Signed-off-by: Bugen Zhao --- .../src/optimizer/plan_node/stream.rs | 2 +- .../optimizer/plan_node/stream_exchange.rs | 2 +- .../src/optimizer/plan_node/stream_share.rs | 10 +++-- .../stream_fragmenter/graph/fragment_graph.rs | 1 + .../stream_fragmenter/rewrite/delta_join.rs | 39 ++++++++++++++----- src/meta/src/stream/stream_graph/fragment.rs | 11 +++++- src/meta/src/stream/test_fragmenter.rs | 2 +- 7 files changed, 48 insertions(+), 19 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index c6df4f486fb44..4ccf758a9c806 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -429,7 +429,7 @@ pub fn to_stream_prost_body( Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), _ => vec![], }, - output_indices: vec![], + output_indices: (0..base.schema().len() as u32).collect(), }), }), Node::DynamicFilter(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 790fc7fca2b08..a7619b75cc597 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -87,7 +87,7 @@ impl StreamNode for StreamExchange { Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(), _ => vec![], }, - output_indices: vec![], + output_indices: (0..self.schema().len() as u32).collect(), }), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 9b19e5657d24b..3b6f24ff28796 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -85,6 +85,8 @@ impl StreamNode for StreamShare { impl StreamShare { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> ProstStreamPlan { let operator_id = self.base.id.0 as u32; + let output_indices = (0..self.schema().len() as u32).collect_vec(); + match state.get_share_stream_node(operator_id) { None => { let dispatch_strategy = match &self.base.dist { @@ -92,18 +94,18 @@ impl StreamShare { DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(), - output_indices: vec![], + output_indices, } } Distribution::Single => DispatchStrategy { r#type: DispatcherType::Simple as i32, dist_key_indices: vec![], - output_indices: vec![], + output_indices, }, Distribution::Broadcast => DispatchStrategy { r#type: DispatcherType::Broadcast as i32, dist_key_indices: vec![], - output_indices: vec![], + output_indices, }, Distribution::SomeShard => { // FIXME: use another DispatcherType? @@ -115,7 +117,7 @@ impl StreamShare { .iter() .map(|x| *x as u32) .collect_vec(), - output_indices: vec![], + output_indices, } } }; diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index fca637d106864..818477d0e4220 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::rc::Rc; +use risingwave_pb::plan_common::Field; use risingwave_pb::stream_plan::stream_fragment_graph::{ StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto, }; diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index f7372eb905380..6a56f22ba1c88 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -36,7 +36,9 @@ fn build_no_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_no_shuffle()), + strategy: Some(dispatch_no_shuffle( + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, @@ -54,27 +56,33 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_consistent_hash_shuffle(dist_key_indices)), + strategy: Some(dispatch_consistent_hash_shuffle( + dist_key_indices, + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, } } -fn dispatch_no_shuffle() -> DispatchStrategy { +fn dispatch_no_shuffle(output_indices: Vec) -> DispatchStrategy { DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), dist_key_indices: vec![], - output_indices: vec![], + output_indices, } } -fn dispatch_consistent_hash_shuffle(dist_key_indices: Vec) -> DispatchStrategy { +fn dispatch_consistent_hash_shuffle( + dist_key_indices: Vec, + output_indices: Vec, +) -> DispatchStrategy { // Actually Hash shuffle is consistent hash shuffle now. DispatchStrategy { r#type: DispatcherType::Hash.into(), dist_key_indices, - output_indices: vec![], + output_indices, } } @@ -138,6 +146,9 @@ fn build_delta_join_inner( let i0_length = arrange_0.fields.len(); let i1_length = arrange_1.fields.len(); + let i0_output_indices = (0..i0_length as u32).collect_vec(); + let i1_output_indices = (0..i1_length as u32).collect_vec(); + let lookup_0_column_reordering = { let tmp: Vec = (i1_length..i1_length + i0_length) .chain(0..i1_length) @@ -206,7 +217,7 @@ fn build_delta_join_inner( arrange_0_frag.fragment_id, lookup_0_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()), link_id: exchange_a0l0.operator_id, }, ); @@ -223,6 +234,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i0_output_indices.clone(), ), link_id: exchange_a0l1.operator_id, }, @@ -240,6 +252,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i1_output_indices.clone(), ), link_id: exchange_a1l0.operator_id, }, @@ -251,7 +264,7 @@ fn build_delta_join_inner( arrange_1_frag.fragment_id, lookup_1_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i1_output_indices.clone()), link_id: exchange_a1l1.operator_id, }, ); @@ -277,7 +290,10 @@ fn build_delta_join_inner( lookup_0_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l0m.operator_id, }, ); @@ -286,7 +302,10 @@ fn build_delta_join_inner( lookup_1_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l1m.operator_id, }, ); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 4820058f46c09..dbd153529b01f 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -417,6 +417,12 @@ impl CompleteStreamFragmentGraph { .context("upstream materialized view fragment not found")?; let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); + // TODO: we can only output the fields that are used by the downstream `Chain` + let mview_output_indices = { + let nodes = mview_fragment.actors[0].nodes.as_ref().unwrap(); + (0..nodes.fields.len() as u32).collect() + }; + let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, @@ -426,7 +432,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + dist_key_indices: vec![], // not used + output_indices: mview_output_indices, }, }; @@ -492,7 +499,7 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + ..Default::default() // FIXME: output indices }, }; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index f1da09e25beee..a1c20fa8c8cbe 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -214,7 +214,7 @@ fn make_stream_fragments() -> Vec { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], - output_indices: vec![], + output_indices: vec![0, 1, 2], }), })), fields: vec![ From 9d98996c36c04e08e56a90e03ae9defe1e4d667c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 17:13:35 +0800 Subject: [PATCH 6/9] support output indices in compute node Signed-off-by: Bugen Zhao --- .../stream_fragmenter/graph/fragment_graph.rs | 1 - src/stream/src/executor/dispatch.rs | 50 ++++++++++++++++--- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index 818477d0e4220..fca637d106864 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::rc::Rc; -use risingwave_pb::plan_common::Field; use risingwave_pb::stream_plan::stream_fragment_graph::{ StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto, }; diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 4dea7276513c4..1b5f94113d49e 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -305,6 +305,12 @@ impl DispatcherImpl { .map(|&down_id| new_output(context, actor_id, down_id)) .collect::>>()?; + let output_indices = dispatcher + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + use risingwave_pb::stream_plan::DispatcherType::*; let dispatcher_impl = match dispatcher.get_type()? { Hash => { @@ -321,17 +327,23 @@ impl DispatcherImpl { DispatcherImpl::Hash(HashDataDispatcher::new( outputs, dist_key_indices, + output_indices, hash_mapping, dispatcher.dispatcher_id, )) } Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new( outputs, + output_indices, dispatcher.dispatcher_id, )), Simple | NoShuffle => { let [output]: [_; 1] = outputs.try_into().unwrap(); - DispatcherImpl::Simple(SimpleDispatcher::new(output, dispatcher.dispatcher_id)) + DispatcherImpl::Simple(SimpleDispatcher::new( + output, + output_indices, + dispatcher.dispatcher_id, + )) } Unspecified => unreachable!(), }; @@ -509,6 +521,7 @@ impl Dispatcher for RoundRobinDataDispatcher { pub struct HashDataDispatcher { outputs: Vec, keys: Vec, + output_indices: Vec, /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to /// different downstream actors. hash_mapping: ExpandedActorMapping, @@ -529,12 +542,14 @@ impl HashDataDispatcher { pub fn new( outputs: Vec, keys: Vec, + output_indices: Vec, hash_mapping: ExpandedActorMapping, dispatcher_id: DispatcherId, ) -> Self { Self { outputs, keys, + output_indices, hash_mapping, dispatcher_id, } @@ -592,6 +607,8 @@ impl Dispatcher for HashDataDispatcher { let mut last_vnode_when_update_delete = None; let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); + // Apply output indices after calculating the vnode. + let chunk = chunk.reorder_columns(&self.output_indices); // TODO: refactor with `Vis`. let (ops, columns, visibility) = chunk.into_inner(); @@ -690,16 +707,19 @@ impl Dispatcher for HashDataDispatcher { #[derive(Debug)] pub struct BroadcastDispatcher { outputs: HashMap, + output_indices: Vec, dispatcher_id: DispatcherId, } impl BroadcastDispatcher { pub fn new( outputs: impl IntoIterator, + output_indices: Vec, dispatcher_id: DispatcherId, ) -> Self { Self { outputs: Self::into_pairs(outputs).collect(), + output_indices, dispatcher_id, } } @@ -718,6 +738,7 @@ impl Dispatcher for BroadcastDispatcher { fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { async move { + let chunk = chunk.reorder_columns(&self.output_indices); for output in self.outputs.values_mut() { output.send(Message::Chunk(chunk.clone())).await?; } @@ -779,13 +800,19 @@ pub struct SimpleDispatcher { /// Therefore, when dispatching data, we assert that there's exactly one output by /// `Self::output`. output: SmallVec<[BoxedOutput; 2]>, + output_indices: Vec, dispatcher_id: DispatcherId, } impl SimpleDispatcher { - pub fn new(output: BoxedOutput, dispatcher_id: DispatcherId) -> Self { + pub fn new( + output: BoxedOutput, + output_indices: Vec, + dispatcher_id: DispatcherId, + ) -> Self { Self { output: smallvec![output], + output_indices, dispatcher_id, } } @@ -817,6 +844,7 @@ impl Dispatcher for SimpleDispatcher { .exactly_one() .expect("expect exactly one output"); + let chunk = chunk.reorder_columns(&self.output_indices); output.send(Message::Chunk(chunk)).await } } @@ -919,8 +947,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping, 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + vec![0, 1, 2], + hash_mapping, + 0, + ); let chunk = StreamChunk::from_pretty( " I I I @@ -1147,8 +1180,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping.clone(), 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + (0..dimension).collect(), + hash_mapping.clone(), + 0, + ); let mut ops = Vec::new(); for idx in 0..cardinality { From 2a2dd4b6467466868f072daf094eb67c95d3aee4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 17:20:09 +0800 Subject: [PATCH 7/9] fix dashboard Signed-off-by: Bugen Zhao --- dashboard/proto/gen/stream_plan.ts | 45 ++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index c950828c55cef..e87f8e5a81ae4 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -829,7 +829,8 @@ export interface StreamNode { export interface DispatchStrategy { type: DispatcherType; - columnIndices: number[]; + distKeyIndices: number[]; + outputIndices: number[]; } /** @@ -842,7 +843,8 @@ export interface Dispatcher { * Indices of the columns to be used for hashing. * For dispatcher types other than HASH, this is ignored. */ - columnIndices: number[]; + distKeyIndices: number[]; + outputIndices: number[]; /** * The hash mapping for consistent hash. * For dispatcher types other than HASH, this is ignored. @@ -3870,24 +3872,30 @@ export const StreamNode = { }; function createBaseDispatchStrategy(): DispatchStrategy { - return { type: DispatcherType.UNSPECIFIED, columnIndices: [] }; + return { type: DispatcherType.UNSPECIFIED, distKeyIndices: [], outputIndices: [] }; } export const DispatchStrategy = { fromJSON(object: any): DispatchStrategy { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], }; }, toJSON(message: DispatchStrategy): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } return obj; }, @@ -3895,7 +3903,8 @@ export const DispatchStrategy = { fromPartial, I>>(object: I): DispatchStrategy { const message = createBaseDispatchStrategy(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; return message; }, }; @@ -3903,7 +3912,8 @@ export const DispatchStrategy = { function createBaseDispatcher(): Dispatcher { return { type: DispatcherType.UNSPECIFIED, - columnIndices: [], + distKeyIndices: [], + outputIndices: [], hashMapping: undefined, dispatcherId: 0, downstreamActorId: [], @@ -3914,7 +3924,8 @@ export const Dispatcher = { fromJSON(object: any): Dispatcher { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], hashMapping: isSet(object.hashMapping) ? ActorMapping.fromJSON(object.hashMapping) : undefined, dispatcherId: isSet(object.dispatcherId) ? Number(object.dispatcherId) : 0, downstreamActorId: Array.isArray(object?.downstreamActorId) @@ -3926,10 +3937,15 @@ export const Dispatcher = { toJSON(message: Dispatcher): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } message.hashMapping !== undefined && (obj.hashMapping = message.hashMapping ? ActorMapping.toJSON(message.hashMapping) : undefined); @@ -3945,7 +3961,8 @@ export const Dispatcher = { fromPartial, I>>(object: I): Dispatcher { const message = createBaseDispatcher(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; message.hashMapping = (object.hashMapping !== undefined && object.hashMapping !== null) ? ActorMapping.fromPartial(object.hashMapping) : undefined; From 98ea0d085209e118b6d37446cf373ee1771d7c86 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 17:38:58 +0800 Subject: [PATCH 8/9] refine docs Signed-off-by: Bugen Zhao --- proto/stream_plan.proto | 6 +++++- src/frontend/src/stream_fragmenter/rewrite/delta_join.rs | 4 ++-- src/meta/src/stream/stream_graph/fragment.rs | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index b53cac6808123..36811ec46f3d6 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -553,6 +553,8 @@ enum DispatcherType { NO_SHUFFLE = 4; } +// The property of an edge in the fragment graph. +// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. message DispatchStrategy { DispatcherType type = 1; repeated uint32 dist_key_indices = 2; @@ -566,7 +568,9 @@ message Dispatcher { // Indices of the columns to be used for hashing. // For dispatcher types other than HASH, this is ignored. repeated uint32 dist_key_indices = 2; - + // Indices of the columns to output. + // In most cases, this contains all columns in the input. But for some cases like MV on MV or + // schema change, we may only output a subset of the columns. repeated uint32 output_indices = 6; // The hash mapping for consistent hash. // For dispatcher types other than HASH, this is ignored. diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index 6a56f22ba1c88..1450b373baa24 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -234,7 +234,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), - i0_output_indices.clone(), + i0_output_indices, ), link_id: exchange_a0l1.operator_id, }, @@ -264,7 +264,7 @@ fn build_delta_join_inner( arrange_1_frag.fragment_id, lookup_1_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(i1_output_indices.clone()), + dispatch_strategy: dispatch_no_shuffle(i1_output_indices), link_id: exchange_a1l1.operator_id, }, ); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index dbd153529b01f..62d1001d5a359 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -417,7 +417,8 @@ impl CompleteStreamFragmentGraph { .context("upstream materialized view fragment not found")?; let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); - // TODO: we can only output the fields that are used by the downstream `Chain` + // TODO: only output the fields that are used by the downstream `Chain`. + // https://github.com/risingwavelabs/risingwave/issues/4529 let mview_output_indices = { let nodes = mview_fragment.actors[0].nodes.as_ref().unwrap(); (0..nodes.fields.len() as u32).collect() @@ -499,7 +500,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() // FIXME: output indices + dist_key_indices: vec![], // not used + output_indices: vec![], // FIXME: should erase the changes of the schema }, }; From 45decac1c95c6ba1bbd8570a183bc67351a8df2a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 21 Feb 2023 18:08:57 +0800 Subject: [PATCH 9/9] fix dashboard Signed-off-by: Bugen Zhao --- dashboard/proto/gen/stream_plan.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index e87f8e5a81ae4..697a3c5a870fb 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -827,6 +827,10 @@ export interface StreamNode { fields: Field[]; } +/** + * The property of an edge in the fragment graph. + * This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. + */ export interface DispatchStrategy { type: DispatcherType; distKeyIndices: number[]; @@ -844,6 +848,11 @@ export interface Dispatcher { * For dispatcher types other than HASH, this is ignored. */ distKeyIndices: number[]; + /** + * Indices of the columns to output. + * In most cases, this contains all columns in the input. But for some cases like MV on MV or + * schema change, we may only output a subset of the columns. + */ outputIndices: number[]; /** * The hash mapping for consistent hash.