diff --git a/Cargo.lock b/Cargo.lock index 71eef8ad0ea60..40c161022f6b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6396,6 +6396,7 @@ dependencies = [ "async-trait", "async_stack_trace", "bytes", + "derivative", "dyn-clone", "either", "enum-as-inner", diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index c950828c55cef..697a3c5a870fb 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -827,9 +827,14 @@ export interface StreamNode { fields: Field[]; } +/** + * The property of an edge in the fragment graph. + * This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. + */ export interface DispatchStrategy { type: DispatcherType; - columnIndices: number[]; + distKeyIndices: number[]; + outputIndices: number[]; } /** @@ -842,7 +847,13 @@ export interface Dispatcher { * Indices of the columns to be used for hashing. * For dispatcher types other than HASH, this is ignored. */ - columnIndices: number[]; + distKeyIndices: number[]; + /** + * Indices of the columns to output. + * In most cases, this contains all columns in the input. But for some cases like MV on MV or + * schema change, we may only output a subset of the columns. + */ + outputIndices: number[]; /** * The hash mapping for consistent hash. * For dispatcher types other than HASH, this is ignored. @@ -3870,24 +3881,30 @@ export const StreamNode = { }; function createBaseDispatchStrategy(): DispatchStrategy { - return { type: DispatcherType.UNSPECIFIED, columnIndices: [] }; + return { type: DispatcherType.UNSPECIFIED, distKeyIndices: [], outputIndices: [] }; } export const DispatchStrategy = { fromJSON(object: any): DispatchStrategy { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], }; }, toJSON(message: DispatchStrategy): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } return obj; }, @@ -3895,7 +3912,8 @@ export const DispatchStrategy = { fromPartial, I>>(object: I): DispatchStrategy { const message = createBaseDispatchStrategy(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; return message; }, }; @@ -3903,7 +3921,8 @@ export const DispatchStrategy = { function createBaseDispatcher(): Dispatcher { return { type: DispatcherType.UNSPECIFIED, - columnIndices: [], + distKeyIndices: [], + outputIndices: [], hashMapping: undefined, dispatcherId: 0, downstreamActorId: [], @@ -3914,7 +3933,8 @@ export const Dispatcher = { fromJSON(object: any): Dispatcher { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], hashMapping: isSet(object.hashMapping) ? ActorMapping.fromJSON(object.hashMapping) : undefined, dispatcherId: isSet(object.dispatcherId) ? Number(object.dispatcherId) : 0, downstreamActorId: Array.isArray(object?.downstreamActorId) @@ -3926,10 +3946,15 @@ export const Dispatcher = { toJSON(message: Dispatcher): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } message.hashMapping !== undefined && (obj.hashMapping = message.hashMapping ? ActorMapping.toJSON(message.hashMapping) : undefined); @@ -3945,7 +3970,8 @@ export const Dispatcher = { fromPartial, I>>(object: I): Dispatcher { const message = createBaseDispatcher(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; message.hashMapping = (object.hashMapping !== undefined && object.hashMapping !== null) ? ActorMapping.fromPartial(object.hashMapping) : undefined; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index bd60dcd19ffce..36811ec46f3d6 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -553,9 +553,12 @@ enum DispatcherType { NO_SHUFFLE = 4; } +// The property of an edge in the fragment graph. +// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. message DispatchStrategy { DispatcherType type = 1; - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; + repeated uint32 output_indices = 3; } // A dispatcher redistribute messages. @@ -564,7 +567,11 @@ message Dispatcher { DispatcherType type = 1; // Indices of the columns to be used for hashing. // For dispatcher types other than HASH, this is ignored. - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; + // Indices of the columns to output. + // In most cases, this contains all columns in the input. But for some cases like MV on MV or + // schema change, we may only output a subset of the columns. + repeated uint32 output_indices = 6; // The hash mapping for consistent hash. // For dispatcher types other than HASH, this is ignored. ActorMapping hash_mapping = 3; diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index afa2bd78607ed..4ccf758a9c806 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -425,10 +425,11 @@ pub fn to_stream_prost_body( Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &base.dist { + dist_key_indices: match &base.dist { Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), _ => vec![], }, + output_indices: (0..base.schema().len() as u32).collect(), }), }), Node::DynamicFilter(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index e8f5d36cf53ed..a7619b75cc597 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -83,10 +83,11 @@ impl StreamNode for StreamExchange { Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &self.base.dist { + dist_key_indices: match &self.base.dist { Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(), _ => vec![], }, + output_indices: (0..self.schema().len() as u32).collect(), }), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 66361b3b89174..3b6f24ff28796 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -85,33 +85,39 @@ impl StreamNode for StreamShare { impl StreamShare { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> ProstStreamPlan { let operator_id = self.base.id.0 as u32; + let output_indices = (0..self.schema().len() as u32).collect_vec(); + match state.get_share_stream_node(operator_id) { None => { let dispatch_strategy = match &self.base.dist { Distribution::HashShard(keys) | Distribution::UpstreamHashShard(keys, _) => { DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: keys.iter().map(|x| *x as u32).collect_vec(), + dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(), + output_indices, } } Distribution::Single => DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices, }, Distribution::Broadcast => DispatchStrategy { r#type: DispatcherType::Broadcast as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices, }, Distribution::SomeShard => { // FIXME: use another DispatcherType? DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: self + dist_key_indices: self .base .logical_pk .iter() .map(|x| *x as u32) .collect_vec(), + output_indices, } } }; diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 35eae9dc34d77..628ba0d486f25 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -139,7 +139,8 @@ fn rewrite_stream_node( let strategy = DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], // TODO: use distribution key + dist_key_indices: vec![], // TODO: use distribution key + output_indices: (0..(child_node.fields.len() as u32)).collect(), }; Ok(StreamNode { stream_key: child_node.stream_key.clone(), diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index fec3424c2ac00..1450b373baa24 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -36,7 +36,9 @@ fn build_no_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_no_shuffle()), + strategy: Some(dispatch_no_shuffle( + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, @@ -46,7 +48,7 @@ fn build_no_shuffle_exchange_for_delta_join( fn build_consistent_hash_shuffle_exchange_for_delta_join( state: &mut BuildFragmentGraphState, upstream: &StreamNode, - column_indices: Vec, + dist_key_indices: Vec, ) -> StreamNode { StreamNode { operator_id: state.gen_operator_id() as u64, @@ -54,25 +56,33 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_consistent_hash_shuffle(column_indices)), + strategy: Some(dispatch_consistent_hash_shuffle( + dist_key_indices, + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, } } -fn dispatch_no_shuffle() -> DispatchStrategy { +fn dispatch_no_shuffle(output_indices: Vec) -> DispatchStrategy { DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], + dist_key_indices: vec![], + output_indices, } } -fn dispatch_consistent_hash_shuffle(column_indices: Vec) -> DispatchStrategy { +fn dispatch_consistent_hash_shuffle( + dist_key_indices: Vec, + output_indices: Vec, +) -> DispatchStrategy { // Actually Hash shuffle is consistent hash shuffle now. DispatchStrategy { r#type: DispatcherType::Hash.into(), - column_indices, + dist_key_indices, + output_indices, } } @@ -136,6 +146,9 @@ fn build_delta_join_inner( let i0_length = arrange_0.fields.len(); let i1_length = arrange_1.fields.len(); + let i0_output_indices = (0..i0_length as u32).collect_vec(); + let i1_output_indices = (0..i1_length as u32).collect_vec(); + let lookup_0_column_reordering = { let tmp: Vec = (i1_length..i1_length + i0_length) .chain(0..i1_length) @@ -204,7 +217,7 @@ fn build_delta_join_inner( arrange_0_frag.fragment_id, lookup_0_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()), link_id: exchange_a0l0.operator_id, }, ); @@ -221,6 +234,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i0_output_indices, ), link_id: exchange_a0l1.operator_id, }, @@ -238,6 +252,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i1_output_indices.clone(), ), link_id: exchange_a1l0.operator_id, }, @@ -249,7 +264,7 @@ fn build_delta_join_inner( arrange_1_frag.fragment_id, lookup_1_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i1_output_indices), link_id: exchange_a1l1.operator_id, }, ); @@ -275,7 +290,10 @@ fn build_delta_join_inner( lookup_0_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l0m.operator_id, }, ); @@ -284,7 +302,10 @@ fn build_delta_join_inner( lookup_1_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l1m.operator_id, }, ); diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 017c76593a740..54c8140ae4d8e 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -129,7 +129,7 @@ impl StreamGraphFormatter { "StreamExchange {} from {}", match dist.r#type() { DispatcherType::Unspecified => unreachable!(), - DispatcherType::Hash => format!("Hash({:?})", dist.column_indices), + DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices), DispatcherType::Broadcast => "Broadcast".to_string(), DispatcherType::Simple => "Single".to_string(), DispatcherType::NoShuffle => "NoShuffle".to_string(), diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index ebcfcbaee8bc8..41315fb78060f 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -25,7 +25,9 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; -use risingwave_pb::stream_plan::{Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode}; +use risingwave_pb::stream_plan::{ + DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, +}; use super::id::GlobalFragmentIdsExt; use super::Locations; @@ -339,14 +341,17 @@ impl ActorGraphBuildStateInner { /// Create a new hash dispatcher. fn new_hash_dispatcher( - column_indices: &[u32], + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], downstream_actor_mapping: ActorMapping, ) -> Dispatcher { + assert_eq!(strategy.r#type(), DispatcherType::Hash); + Dispatcher { r#type: DispatcherType::Hash as _, - column_indices: column_indices.to_vec(), + dist_key_indices: strategy.dist_key_indices.clone(), + output_indices: strategy.output_indices.clone(), hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -355,14 +360,17 @@ impl ActorGraphBuildStateInner { /// Create a new dispatcher for non-hash types. fn new_normal_dispatcher( - dispatcher_type: DispatcherType, + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], ) -> Dispatcher { - assert_ne!(dispatcher_type, DispatcherType::Hash); + assert_ne!(strategy.r#type(), DispatcherType::Hash); + assert!(strategy.dist_key_indices.is_empty()); + Dispatcher { - r#type: dispatcher_type as _, - column_indices: Vec::new(), + r#type: strategy.r#type, + dist_key_indices: vec![], + output_indices: strategy.output_indices.clone(), hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -441,7 +449,11 @@ impl ActorGraphBuildStateInner { // Create a new dispatcher just between these two actors. self.add_dispatcher( *upstream_id, - Self::new_normal_dispatcher(dt, downstream.fragment_id, &[*downstream_id]), + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + &[*downstream_id], + ), ); // Also record the upstream for the downstream actor. @@ -474,13 +486,17 @@ impl ActorGraphBuildStateInner { .to_actor(&downstream_locations); Self::new_hash_dispatcher( - &edge.dispatch_strategy.column_indices, + &edge.dispatch_strategy, downstream.fragment_id, downstream.actor_ids, actor_mapping, ) } else { - Self::new_normal_dispatcher(dt, downstream.fragment_id, downstream.actor_ids) + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + downstream.actor_ids, + ) }; for upstream_id in upstream.actor_ids { self.add_dispatcher(*upstream_id, dispatcher.clone()); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 4820058f46c09..62d1001d5a359 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -417,6 +417,13 @@ impl CompleteStreamFragmentGraph { .context("upstream materialized view fragment not found")?; let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); + // TODO: only output the fields that are used by the downstream `Chain`. + // https://github.com/risingwavelabs/risingwave/issues/4529 + let mview_output_indices = { + let nodes = mview_fragment.actors[0].nodes.as_ref().unwrap(); + (0..nodes.fields.len() as u32).collect() + }; + let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, @@ -426,7 +433,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + dist_key_indices: vec![], // not used + output_indices: mview_output_indices, }, }; @@ -492,7 +500,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + dist_key_indices: vec![], // not used + output_indices: vec![], // FIXME: should erase the changes of the schema }, }; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 9dca3500cf8b9..a1c20fa8c8cbe 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -213,7 +213,8 @@ fn make_stream_fragments() -> Vec { node_body: Some(NodeBody::Exchange(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], + output_indices: vec![0, 1, 2], }), })), fields: vec![ @@ -374,7 +375,8 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices: vec![], }), link_id: 4, upstream_id: 1, @@ -383,7 +385,8 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], + output_indices: vec![], }), link_id: 1, upstream_id: 2, diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 2c92d2566066f..baca2796e6198 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -21,6 +21,7 @@ async-stream = "0.3" async-trait = "0.1" async_stack_trace = { path = "../utils/async_stack_trace" } bytes = "1" +derivative = "2" dyn-clone = "1" either = "1" enum-as-inner = "0.5" diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index b89c9fa9f6409..1b5f94113d49e 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -305,12 +305,18 @@ impl DispatcherImpl { .map(|&down_id| new_output(context, actor_id, down_id)) .collect::>>()?; + let output_indices = dispatcher + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + use risingwave_pb::stream_plan::DispatcherType::*; let dispatcher_impl = match dispatcher.get_type()? { Hash => { assert!(!outputs.is_empty()); - let column_indices = dispatcher - .column_indices + let dist_key_indices = dispatcher + .dist_key_indices .iter() .map(|i| *i as usize) .collect(); @@ -320,18 +326,24 @@ impl DispatcherImpl { DispatcherImpl::Hash(HashDataDispatcher::new( outputs, - column_indices, + dist_key_indices, + output_indices, hash_mapping, dispatcher.dispatcher_id, )) } Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new( outputs, + output_indices, dispatcher.dispatcher_id, )), Simple | NoShuffle => { let [output]: [_; 1] = outputs.try_into().unwrap(); - DispatcherImpl::Simple(SimpleDispatcher::new(output, dispatcher.dispatcher_id)) + DispatcherImpl::Simple(SimpleDispatcher::new( + output, + output_indices, + dispatcher.dispatcher_id, + )) } Unspecified => unreachable!(), }; @@ -509,6 +521,7 @@ impl Dispatcher for RoundRobinDataDispatcher { pub struct HashDataDispatcher { outputs: Vec, keys: Vec, + output_indices: Vec, /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to /// different downstream actors. hash_mapping: ExpandedActorMapping, @@ -529,12 +542,14 @@ impl HashDataDispatcher { pub fn new( outputs: Vec, keys: Vec, + output_indices: Vec, hash_mapping: ExpandedActorMapping, dispatcher_id: DispatcherId, ) -> Self { Self { outputs, keys, + output_indices, hash_mapping, dispatcher_id, } @@ -592,6 +607,8 @@ impl Dispatcher for HashDataDispatcher { let mut last_vnode_when_update_delete = None; let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); + // Apply output indices after calculating the vnode. + let chunk = chunk.reorder_columns(&self.output_indices); // TODO: refactor with `Vis`. let (ops, columns, visibility) = chunk.into_inner(); @@ -690,16 +707,19 @@ impl Dispatcher for HashDataDispatcher { #[derive(Debug)] pub struct BroadcastDispatcher { outputs: HashMap, + output_indices: Vec, dispatcher_id: DispatcherId, } impl BroadcastDispatcher { pub fn new( outputs: impl IntoIterator, + output_indices: Vec, dispatcher_id: DispatcherId, ) -> Self { Self { outputs: Self::into_pairs(outputs).collect(), + output_indices, dispatcher_id, } } @@ -718,6 +738,7 @@ impl Dispatcher for BroadcastDispatcher { fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { async move { + let chunk = chunk.reorder_columns(&self.output_indices); for output in self.outputs.values_mut() { output.send(Message::Chunk(chunk.clone())).await?; } @@ -779,13 +800,19 @@ pub struct SimpleDispatcher { /// Therefore, when dispatching data, we assert that there's exactly one output by /// `Self::output`. output: SmallVec<[BoxedOutput; 2]>, + output_indices: Vec, dispatcher_id: DispatcherId, } impl SimpleDispatcher { - pub fn new(output: BoxedOutput, dispatcher_id: DispatcherId) -> Self { + pub fn new( + output: BoxedOutput, + output_indices: Vec, + dispatcher_id: DispatcherId, + ) -> Self { Self { output: smallvec![output], + output_indices, dispatcher_id, } } @@ -817,6 +844,7 @@ impl Dispatcher for SimpleDispatcher { .exactly_one() .expect("expect exactly one output"); + let chunk = chunk.reorder_columns(&self.output_indices); output.send(Message::Chunk(chunk)).await } } @@ -919,8 +947,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping, 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + vec![0, 1, 2], + hash_mapping, + 0, + ); let chunk = StreamChunk::from_pretty( " I I I @@ -1147,8 +1180,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping.clone(), 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + (0..dimension).collect(), + hash_mapping.clone(), + 0, + ); let mut ops = Vec::new(); for idx in 0..cardinality { diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index c2954d6cd2dd1..737defdae9ada 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use anyhow::anyhow; use async_stack_trace::{SpanValue, StackTrace}; use async_trait::async_trait; +use derivative::Derivative; use risingwave_common::util::addr::is_local_address; use tokio::sync::mpsc::error::SendError; @@ -44,22 +45,18 @@ pub trait Output: Debug + Send + Sync + 'static { pub type BoxedOutput = Box; /// `LocalOutput` sends data to a local channel. +#[derive(Derivative)] +#[derivative(Debug)] pub struct LocalOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for LocalOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LocalOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl LocalOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self { @@ -97,22 +94,18 @@ impl Output for LocalOutput { /// /// [`ExchangeService`]: risingwave_pb::task_service::exchange_service_server::ExchangeService // FIXME: can we just use the same `Output` with local and compacts it in gRPC server? +#[derive(Derivative)] +#[derivative(Debug)] pub struct RemoteOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for RemoteOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RemoteOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl RemoteOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self {