diff --git a/Cargo.lock b/Cargo.lock index 076d032cb1552..6893607881919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6443,6 +6443,7 @@ dependencies = [ "async-trait", "async_stack_trace", "bytes", + "derivative", "dyn-clone", "either", "enum-as-inner", diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index ad5854a2fd8bd..9eb8d72107b1f 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -87,6 +87,7 @@ echo "--- starting risingwave cluster with connector node" cargo make ci-start ci-1cn-1fe echo "--- testing sinks" +sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt' sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt' sleep 1 diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index c950828c55cef..697a3c5a870fb 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -827,9 +827,14 @@ export interface StreamNode { fields: Field[]; } +/** + * The property of an edge in the fragment graph. + * This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. + */ export interface DispatchStrategy { type: DispatcherType; - columnIndices: number[]; + distKeyIndices: number[]; + outputIndices: number[]; } /** @@ -842,7 +847,13 @@ export interface Dispatcher { * Indices of the columns to be used for hashing. * For dispatcher types other than HASH, this is ignored. */ - columnIndices: number[]; + distKeyIndices: number[]; + /** + * Indices of the columns to output. + * In most cases, this contains all columns in the input. But for some cases like MV on MV or + * schema change, we may only output a subset of the columns. + */ + outputIndices: number[]; /** * The hash mapping for consistent hash. * For dispatcher types other than HASH, this is ignored. @@ -3870,24 +3881,30 @@ export const StreamNode = { }; function createBaseDispatchStrategy(): DispatchStrategy { - return { type: DispatcherType.UNSPECIFIED, columnIndices: [] }; + return { type: DispatcherType.UNSPECIFIED, distKeyIndices: [], outputIndices: [] }; } export const DispatchStrategy = { fromJSON(object: any): DispatchStrategy { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], }; }, toJSON(message: DispatchStrategy): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } return obj; }, @@ -3895,7 +3912,8 @@ export const DispatchStrategy = { fromPartial, I>>(object: I): DispatchStrategy { const message = createBaseDispatchStrategy(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; return message; }, }; @@ -3903,7 +3921,8 @@ export const DispatchStrategy = { function createBaseDispatcher(): Dispatcher { return { type: DispatcherType.UNSPECIFIED, - columnIndices: [], + distKeyIndices: [], + outputIndices: [], hashMapping: undefined, dispatcherId: 0, downstreamActorId: [], @@ -3914,7 +3933,8 @@ export const Dispatcher = { fromJSON(object: any): Dispatcher { return { type: isSet(object.type) ? dispatcherTypeFromJSON(object.type) : DispatcherType.UNSPECIFIED, - columnIndices: Array.isArray(object?.columnIndices) ? object.columnIndices.map((e: any) => Number(e)) : [], + distKeyIndices: Array.isArray(object?.distKeyIndices) ? object.distKeyIndices.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], hashMapping: isSet(object.hashMapping) ? ActorMapping.fromJSON(object.hashMapping) : undefined, dispatcherId: isSet(object.dispatcherId) ? Number(object.dispatcherId) : 0, downstreamActorId: Array.isArray(object?.downstreamActorId) @@ -3926,10 +3946,15 @@ export const Dispatcher = { toJSON(message: Dispatcher): unknown { const obj: any = {}; message.type !== undefined && (obj.type = dispatcherTypeToJSON(message.type)); - if (message.columnIndices) { - obj.columnIndices = message.columnIndices.map((e) => Math.round(e)); + if (message.distKeyIndices) { + obj.distKeyIndices = message.distKeyIndices.map((e) => Math.round(e)); } else { - obj.columnIndices = []; + obj.distKeyIndices = []; + } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; } message.hashMapping !== undefined && (obj.hashMapping = message.hashMapping ? ActorMapping.toJSON(message.hashMapping) : undefined); @@ -3945,7 +3970,8 @@ export const Dispatcher = { fromPartial, I>>(object: I): Dispatcher { const message = createBaseDispatcher(); message.type = object.type ?? DispatcherType.UNSPECIFIED; - message.columnIndices = object.columnIndices?.map((e) => e) || []; + message.distKeyIndices = object.distKeyIndices?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; message.hashMapping = (object.hashMapping !== undefined && object.hashMapping !== null) ? ActorMapping.fromPartial(object.hashMapping) : undefined; diff --git a/e2e_test/batch/explain.slt b/e2e_test/batch/explain.slt index 9a42b8e3af986..388f92440ebae 100644 --- a/e2e_test/batch/explain.slt +++ b/e2e_test/batch/explain.slt @@ -1,11 +1,11 @@ statement ok -create table t(v int); +create table t (v int) with ( appendonly = 'true' ); statement ok explain create index i on t(v); statement ok -explain create sink sink_t from t with ( connector = 'kafka' ) +explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' ) statement ok drop table t; diff --git a/e2e_test/ddl/invalid_operation.slt b/e2e_test/ddl/invalid_operation.slt index 97696042accd7..9d5ef49c7272b 100644 --- a/e2e_test/ddl/invalid_operation.slt +++ b/e2e_test/ddl/invalid_operation.slt @@ -62,7 +62,7 @@ drop view not_exists.not_exists.not_exists; # 4.1. table statement ok -create table t (v int); +create table t (v int primary key); statement error Use `DROP TABLE` drop materialized view t; diff --git a/e2e_test/ddl/table.slt b/e2e_test/ddl/table.slt index 6cc3d7325c424..e941c534e8ba8 100644 --- a/e2e_test/ddl/table.slt +++ b/e2e_test/ddl/table.slt @@ -35,10 +35,10 @@ statement ok explain select v2 from ddl_t; statement ok -explain create sink sink_t from ddl_t with ( connector = 'kafka' ); +explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' ); statement ok -explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka' ); +explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' ); # Create a mview with duplicated name. statement error diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt new file mode 100644 index 0000000000000..cf4e185d1b28f --- /dev/null +++ b/e2e_test/sink/append_only_sink.slt @@ -0,0 +1,53 @@ +statement ok +create table t1 (v1 int, v2 int); + +statement error No primary key for the upsert sink +create sink s1 from t1 with (connector = 'console'); + +statement ok +create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console'); + +statement ok +create table t2 (v1 int, v2 int primary key); + +statement ok +create sink s2 from t2 with (connector = 'console'); + +statement error No primary key for the upsert sink +create sink s3 as select avg(v1) from t2 with (connector = 'console'); + +statement ok +create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true'); + +statement ok +create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console'); + +statement error The sink cannot be append-only +create sink s5 from t2 with (connector = 'console', format = 'append_only'); + +statement ok +create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true'); + +statement error Cannot force the sink to be append-only +create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true'); + +statement ok +drop sink s1 + +statement ok +drop sink s2 + +statement ok +drop sink s3 + +statement ok +drop sink s4 + +statement ok +drop sink s5 + +statement ok +drop table t1 + +statement ok +drop table t2 diff --git a/e2e_test/sink/blackhole_sink.slt b/e2e_test/sink/blackhole_sink.slt index e3e9810f81f73..01b744a37b8c7 100644 --- a/e2e_test/sink/blackhole_sink.slt +++ b/e2e_test/sink/blackhole_sink.slt @@ -1,5 +1,5 @@ statement ok -CREATE TABLE t5 (v1 int, v2 int); +CREATE TABLE t5 (v1 int primary key, v2 int); statement ok CREATE MATERIALIZED VIEW mv5 AS SELECT * FROM t5; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index bd60dcd19ffce..36811ec46f3d6 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -553,9 +553,12 @@ enum DispatcherType { NO_SHUFFLE = 4; } +// The property of an edge in the fragment graph. +// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details. message DispatchStrategy { DispatcherType type = 1; - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; + repeated uint32 output_indices = 3; } // A dispatcher redistribute messages. @@ -564,7 +567,11 @@ message Dispatcher { DispatcherType type = 1; // Indices of the columns to be used for hashing. // For dispatcher types other than HASH, this is ignored. - repeated uint32 column_indices = 2; + repeated uint32 dist_key_indices = 2; + // Indices of the columns to output. + // In most cases, this contains all columns in the input. But for some cases like MV on MV or + // schema change, we may only output a subset of the columns. + repeated uint32 output_indices = 6; // The hash mapping for consistent hash. // For dispatcher types other than HASH, this is ignored. ActorMapping hash_mapping = 3; diff --git a/risedev.yml b/risedev.yml index 0e5509d61f419..adf5537d894df 100644 --- a/risedev.yml +++ b/risedev.yml @@ -176,7 +176,6 @@ profile: - use: kafka persist-data: true - 3etcd-3meta: steps: - use: etcd diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2afac6852fb8a..51b7faefbf6f6 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -176,7 +176,7 @@ pub mod tests { let sql = r#"CREATE SINK snk1 FROM mv1 WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table = '', mysql.database = '', mysql.user = '', - mysql.password = '');"#.to_string(); + mysql.password = '', format = 'append_only', force_append_only = 'true');"#.to_string(); frontend.run_sql(sql).await.unwrap(); let session = frontend.session_ref(); diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 81af96449991a..6b3a472f5cb49 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -70,7 +70,7 @@ mod tests { #[tokio::test] async fn test_drop_sink_handler() { - let sql_create_table = "create table t (v1 smallint);"; + let sql_create_table = "create table t (v1 smallint primary key);"; let sql_create_mv = "create materialized view mv as select v1 from t;"; let sql_create_sink = "create sink snk from mv with( connector = 'mysql')"; let sql_drop_sink = "drop sink snk;"; diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f9f5e956a485d..528b8f9cd04d7 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -37,8 +37,8 @@ use risingwave_common::util::iter_util::ZipEqDebug; use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer}; use self::plan_node::{ - BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamRowIdGen, - StreamSink, + BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject, + StreamRowIdGen, StreamSink, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -49,6 +49,7 @@ use self::plan_visitor::{ use self::property::RequiredDist; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; +use crate::expr::InputRef; use crate::optimizer::plan_node::{ BatchExchange, ColumnPruningContext, PlanNodeType, PlanTreeNode, PredicatePushdownContext, RewriteExprsRecursive, @@ -746,7 +747,24 @@ impl PlanRoot { definition: String, properties: WithOptions, ) -> Result { - let stream_plan = self.gen_stream_plan()?; + let mut stream_plan = self.gen_stream_plan()?; + + // Add a project node if there is hidden column(s). + let input_fields = stream_plan.schema().fields(); + if input_fields.len() != self.out_fields.count_ones(..) { + let exprs = input_fields + .iter() + .enumerate() + .filter_map(|(idx, field)| { + if self.out_fields.contains(idx) { + Some(InputRef::new(idx, field.data_type.clone()).into()) + } else { + None + } + }) + .collect_vec(); + stream_plan = StreamProject::new(LogicalProject::new(stream_plan, exprs)).into(); + } StreamSink::create( stream_plan, diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index 1498d2183dcb3..3a0e198f56eaa 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -23,7 +23,7 @@ use super::PlanRef; use crate::optimizer::property::{Direction, FieldOrder, Order}; pub(crate) fn derive_columns( - schema: &Schema, + input_schema: &Schema, out_names: Vec, user_cols: &FixedBitSet, ) -> Result> { @@ -39,7 +39,7 @@ pub(crate) fn derive_columns( } let mut out_name_iter = out_names.into_iter(); - let columns = schema + let columns = input_schema .fields() .iter() .enumerate() diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index afa2bd78607ed..4ccf758a9c806 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -425,10 +425,11 @@ pub fn to_stream_prost_body( Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &base.dist { + dist_key_indices: match &base.dist { Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), _ => vec![], }, + output_indices: (0..base.schema().len() as u32).collect(), }), }), Node::DynamicFilter(me) => { diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index e8f5d36cf53ed..a7619b75cc597 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -83,10 +83,11 @@ impl StreamNode for StreamExchange { Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - column_indices: match &self.base.dist { + dist_key_indices: match &self.base.dist { Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(), _ => vec![], }, + output_indices: (0..self.schema().len() as u32).collect(), }), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 66361b3b89174..3b6f24ff28796 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -85,33 +85,39 @@ impl StreamNode for StreamShare { impl StreamShare { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> ProstStreamPlan { let operator_id = self.base.id.0 as u32; + let output_indices = (0..self.schema().len() as u32).collect_vec(); + match state.get_share_stream_node(operator_id) { None => { let dispatch_strategy = match &self.base.dist { Distribution::HashShard(keys) | Distribution::UpstreamHashShard(keys, _) => { DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: keys.iter().map(|x| *x as u32).collect_vec(), + dist_key_indices: keys.iter().map(|x| *x as u32).collect_vec(), + output_indices, } } Distribution::Single => DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices, }, Distribution::Broadcast => DispatchStrategy { r#type: DispatcherType::Broadcast as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices, }, Distribution::SomeShard => { // FIXME: use another DispatcherType? DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: self + dist_key_indices: self .base .logical_pk .iter() .map(|x| *x as u32) .collect_vec(), + output_indices, } } }; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 0a5426aaa43fe..05881ffb73b8c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -28,6 +28,7 @@ use risingwave_connector::sink::{ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use super::derive::{derive_columns, derive_pk}; +use super::utils::IndicesDisplay; use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -102,6 +103,14 @@ impl StreamSink { let sink_type = Self::derive_sink_type(input.append_only(), &properties)?; let (pk, stream_key) = derive_pk(input, user_order_by, &columns); + if sink_type == SinkType::Upsert && pk.is_empty() { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + "No primary key for the upsert sink. Please include the primary key explicitly in sink definition or make the sink append-only.", + ))) + .into()); + } + Ok(SinkDesc { id: SinkId::placeholder(), name, @@ -140,7 +149,7 @@ impl StreamSink { (_, false, true) => { Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, - "Cannot force the sink to be append-only without \"format='append_only'\"in WITH options", + "Cannot force the sink to be append-only without \"format='append_only'\"in WITH options.", ))) .into()) } @@ -164,6 +173,33 @@ impl_plan_tree_node_for_unary! { StreamSink } impl fmt::Display for StreamSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut builder = f.debug_struct("StreamSink"); + + let sink_type = if self.sink_desc.sink_type.is_append_only() { + "append-only" + } else { + "upsert" + }; + let column_names = self + .sink_desc + .columns + .iter() + .map(|col| col.column_desc.name.clone()) + .collect_vec() + .join(", "); + builder + .field("type", &format_args!("{}", sink_type)) + .field("columns", &format_args!("[{}]", column_names)); + + if self.sink_desc.sink_type.is_upsert() { + builder.field( + "pk", + &IndicesDisplay { + indices: &self.sink_desc.pk.iter().map(|k| k.column_idx).collect_vec(), + input_schema: &self.base.schema, + }, + ); + } + builder.finish() } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 35eae9dc34d77..628ba0d486f25 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -139,7 +139,8 @@ fn rewrite_stream_node( let strategy = DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], // TODO: use distribution key + dist_key_indices: vec![], // TODO: use distribution key + output_indices: (0..(child_node.fields.len() as u32)).collect(), }; Ok(StreamNode { stream_key: child_node.stream_key.clone(), diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index fec3424c2ac00..1450b373baa24 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -36,7 +36,9 @@ fn build_no_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_no_shuffle()), + strategy: Some(dispatch_no_shuffle( + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, @@ -46,7 +48,7 @@ fn build_no_shuffle_exchange_for_delta_join( fn build_consistent_hash_shuffle_exchange_for_delta_join( state: &mut BuildFragmentGraphState, upstream: &StreamNode, - column_indices: Vec, + dist_key_indices: Vec, ) -> StreamNode { StreamNode { operator_id: state.gen_operator_id() as u64, @@ -54,25 +56,33 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(dispatch_consistent_hash_shuffle(column_indices)), + strategy: Some(dispatch_consistent_hash_shuffle( + dist_key_indices, + (0..(upstream.fields.len() as u32)).collect(), + )), })), input: vec![], append_only: upstream.append_only, } } -fn dispatch_no_shuffle() -> DispatchStrategy { +fn dispatch_no_shuffle(output_indices: Vec) -> DispatchStrategy { DispatchStrategy { r#type: DispatcherType::NoShuffle.into(), - column_indices: vec![], + dist_key_indices: vec![], + output_indices, } } -fn dispatch_consistent_hash_shuffle(column_indices: Vec) -> DispatchStrategy { +fn dispatch_consistent_hash_shuffle( + dist_key_indices: Vec, + output_indices: Vec, +) -> DispatchStrategy { // Actually Hash shuffle is consistent hash shuffle now. DispatchStrategy { r#type: DispatcherType::Hash.into(), - column_indices, + dist_key_indices, + output_indices, } } @@ -136,6 +146,9 @@ fn build_delta_join_inner( let i0_length = arrange_0.fields.len(); let i1_length = arrange_1.fields.len(); + let i0_output_indices = (0..i0_length as u32).collect_vec(); + let i1_output_indices = (0..i1_length as u32).collect_vec(); + let lookup_0_column_reordering = { let tmp: Vec = (i1_length..i1_length + i0_length) .chain(0..i1_length) @@ -204,7 +217,7 @@ fn build_delta_join_inner( arrange_0_frag.fragment_id, lookup_0_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i0_output_indices.clone()), link_id: exchange_a0l0.operator_id, }, ); @@ -221,6 +234,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i0_output_indices, ), link_id: exchange_a0l1.operator_id, }, @@ -238,6 +252,7 @@ fn build_delta_join_inner( .iter() .map(|x| *x as u32) .collect_vec(), + i1_output_indices.clone(), ), link_id: exchange_a1l0.operator_id, }, @@ -249,7 +264,7 @@ fn build_delta_join_inner( arrange_1_frag.fragment_id, lookup_1_frag.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_no_shuffle(), + dispatch_strategy: dispatch_no_shuffle(i1_output_indices), link_id: exchange_a1l1.operator_id, }, ); @@ -275,7 +290,10 @@ fn build_delta_join_inner( lookup_0_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l0m.operator_id, }, ); @@ -284,7 +302,10 @@ fn build_delta_join_inner( lookup_1_frag.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - dispatch_strategy: dispatch_consistent_hash_shuffle(node.stream_key.clone()), + dispatch_strategy: dispatch_consistent_hash_shuffle( + node.stream_key.clone(), + (0..node.fields.len() as u32).collect(), + ), link_id: exchange_l1m.operator_id, }, ); diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 017c76593a740..54c8140ae4d8e 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -129,7 +129,7 @@ impl StreamGraphFormatter { "StreamExchange {} from {}", match dist.r#type() { DispatcherType::Unspecified => unreachable!(), - DispatcherType::Hash => format!("Hash({:?})", dist.column_indices), + DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices), DispatcherType::Broadcast => "Broadcast".to_string(), DispatcherType::Simple => "Single".to_string(), DispatcherType::NoShuffle => "NoShuffle".to_string(), diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index d200581d304b2..44d93dacaddc5 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -167,7 +167,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use risingwave_common::config::{load_config, MetaBackend}; +use risingwave_common::config::{load_config, MetaBackend, RwConfig}; use tracing::info; /// Start meta node @@ -202,6 +202,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { MetaBackend::Mem => MetaStoreBackend::Mem, }; + validate_config(&config); + let max_heartbeat_interval = Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64); let barrier_interval = Duration::from_millis(config.streaming.barrier_interval_ms as u64); @@ -271,3 +273,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { } }) } + +fn validate_config(config: &RwConfig) { + if config.meta.meta_leader_lease_secs <= 1 { + let error_msg = "meta leader lease secs should be larger than 1"; + tracing::error!(error_msg); + panic!("{}", error_msg); + } +} diff --git a/src/meta/src/rpc/election_client.rs b/src/meta/src/rpc/election_client.rs index ef62d4239a4c4..b54b53abd4e7b 100644 --- a/src/meta/src/rpc/election_client.rs +++ b/src/meta/src/rpc/election_client.rs @@ -146,15 +146,25 @@ impl ElectionClient for EtcdElectionClient { let mut ticker = time::interval(Duration::from_secs(1)); + // timeout controller, when keep alive fails for more than a certain period of time + // before it is considered a complete failure + let mut timeout = time::interval(Duration::from_secs((ttl / 2) as u64)); + timeout.reset(); + loop { tokio::select! { biased; + _ = timeout.tick() => { + tracing::warn!("lease {} keep alive timeout", lease_id); + keep_alive_fail_tx.send(()).unwrap(); + break; + } + _ = ticker.tick() => { if let Err(err) = keeper.keep_alive().await { - tracing::error!("keep alive for lease {} failed {}", lease_id, err); - keep_alive_fail_tx.send(()).unwrap(); - break; + tracing::warn!("keep alive for lease {} failed {}", lease_id, err); + continue } match resp_stream.message().await { @@ -164,16 +174,23 @@ impl ElectionClient for EtcdElectionClient { keep_alive_fail_tx.send(()).unwrap(); break; } + + timeout.reset(); }, Ok(None) => { tracing::warn!("lease keeper for lease {} response stream closed unexpected", lease_id); - keep_alive_fail_tx.send(()).unwrap(); - break; + + // try to re-create lease keeper, with timeout as ttl / 2 + if let Ok(Ok((keeper_, resp_stream_))) = time::timeout(Duration::from_secs((ttl / 2) as u64), lease_client.keep_alive(lease_id)).await { + keeper = keeper_; + resp_stream = resp_stream_; + }; + + continue; } Err(e) => { tracing::error!("lease keeper failed {}", e.to_string()); - keep_alive_fail_tx.send(()).unwrap(); - break; + continue; } }; } @@ -224,7 +241,15 @@ impl ElectionClient for EtcdElectionClient { }, resp = observe_stream.next() => { match resp { - None => unreachable!(), + None => { + tracing::warn!("observe stream closed unexpected, recreating"); + + // try to re-create observe stream, with timeout as ttl / 2 + if let Ok(Ok(stream)) = time::timeout(Duration::from_secs((ttl / 2) as u64), election_client.observe(META_ELECTION_KEY)).await { + observe_stream = stream; + tracing::info!("recreating observe stream"); + } + } Some(Ok(leader)) => { if let Some(kv) = leader.kv() && kv.value() != self.id.as_bytes() { tracing::warn!("leader has been changed to {}", String::from_utf8_lossy(kv.value()).to_string()); @@ -232,8 +257,8 @@ impl ElectionClient for EtcdElectionClient { } } Some(Err(e)) => { - tracing::error!("error {} received from leader observe stream", e.to_string()); - break; + tracing::warn!("error {} received from leader observe stream", e.to_string()); + continue } } } diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index ebcfcbaee8bc8..41315fb78060f 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -25,7 +25,9 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; -use risingwave_pb::stream_plan::{Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode}; +use risingwave_pb::stream_plan::{ + DispatchStrategy, Dispatcher, DispatcherType, MergeNode, StreamActor, StreamNode, +}; use super::id::GlobalFragmentIdsExt; use super::Locations; @@ -339,14 +341,17 @@ impl ActorGraphBuildStateInner { /// Create a new hash dispatcher. fn new_hash_dispatcher( - column_indices: &[u32], + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], downstream_actor_mapping: ActorMapping, ) -> Dispatcher { + assert_eq!(strategy.r#type(), DispatcherType::Hash); + Dispatcher { r#type: DispatcherType::Hash as _, - column_indices: column_indices.to_vec(), + dist_key_indices: strategy.dist_key_indices.clone(), + output_indices: strategy.output_indices.clone(), hash_mapping: Some(downstream_actor_mapping.to_protobuf()), dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -355,14 +360,17 @@ impl ActorGraphBuildStateInner { /// Create a new dispatcher for non-hash types. fn new_normal_dispatcher( - dispatcher_type: DispatcherType, + strategy: &DispatchStrategy, downstream_fragment_id: GlobalFragmentId, downstream_actors: &[GlobalActorId], ) -> Dispatcher { - assert_ne!(dispatcher_type, DispatcherType::Hash); + assert_ne!(strategy.r#type(), DispatcherType::Hash); + assert!(strategy.dist_key_indices.is_empty()); + Dispatcher { - r#type: dispatcher_type as _, - column_indices: Vec::new(), + r#type: strategy.r#type, + dist_key_indices: vec![], + output_indices: strategy.output_indices.clone(), hash_mapping: None, dispatcher_id: downstream_fragment_id.as_global_id() as u64, downstream_actor_id: downstream_actors.as_global_ids(), @@ -441,7 +449,11 @@ impl ActorGraphBuildStateInner { // Create a new dispatcher just between these two actors. self.add_dispatcher( *upstream_id, - Self::new_normal_dispatcher(dt, downstream.fragment_id, &[*downstream_id]), + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + &[*downstream_id], + ), ); // Also record the upstream for the downstream actor. @@ -474,13 +486,17 @@ impl ActorGraphBuildStateInner { .to_actor(&downstream_locations); Self::new_hash_dispatcher( - &edge.dispatch_strategy.column_indices, + &edge.dispatch_strategy, downstream.fragment_id, downstream.actor_ids, actor_mapping, ) } else { - Self::new_normal_dispatcher(dt, downstream.fragment_id, downstream.actor_ids) + Self::new_normal_dispatcher( + &edge.dispatch_strategy, + downstream.fragment_id, + downstream.actor_ids, + ) }; for upstream_id in upstream.actor_ids { self.add_dispatcher(*upstream_id, dispatcher.clone()); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 4820058f46c09..62d1001d5a359 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -417,6 +417,13 @@ impl CompleteStreamFragmentGraph { .context("upstream materialized view fragment not found")?; let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); + // TODO: only output the fields that are used by the downstream `Chain`. + // https://github.com/risingwavelabs/risingwave/issues/4529 + let mview_output_indices = { + let nodes = mview_fragment.actors[0].nodes.as_ref().unwrap(); + (0..nodes.fields.len() as u32).collect() + }; + let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, @@ -426,7 +433,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + dist_key_indices: vec![], // not used + output_indices: mview_output_indices, }, }; @@ -492,7 +500,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - ..Default::default() + dist_key_indices: vec![], // not used + output_indices: vec![], // FIXME: should erase the changes of the schema }, }; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 9dca3500cf8b9..a1c20fa8c8cbe 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -213,7 +213,8 @@ fn make_stream_fragments() -> Vec { node_body: Some(NodeBody::Exchange(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], + output_indices: vec![0, 1, 2], }), })), fields: vec![ @@ -374,7 +375,8 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as i32, - column_indices: vec![], + dist_key_indices: vec![], + output_indices: vec![], }), link_id: 4, upstream_id: 1, @@ -383,7 +385,8 @@ fn make_fragment_edges() -> Vec { StreamFragmentEdge { dispatch_strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, - column_indices: vec![0], + dist_key_indices: vec![0], + output_indices: vec![], }), link_id: 1, upstream_id: 2, diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 2c92d2566066f..baca2796e6198 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -21,6 +21,7 @@ async-stream = "0.3" async-trait = "0.1" async_stack_trace = { path = "../utils/async_stack_trace" } bytes = "1" +derivative = "2" dyn-clone = "1" either = "1" enum-as-inner = "0.5" diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index b89c9fa9f6409..1b5f94113d49e 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -305,12 +305,18 @@ impl DispatcherImpl { .map(|&down_id| new_output(context, actor_id, down_id)) .collect::>>()?; + let output_indices = dispatcher + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + use risingwave_pb::stream_plan::DispatcherType::*; let dispatcher_impl = match dispatcher.get_type()? { Hash => { assert!(!outputs.is_empty()); - let column_indices = dispatcher - .column_indices + let dist_key_indices = dispatcher + .dist_key_indices .iter() .map(|i| *i as usize) .collect(); @@ -320,18 +326,24 @@ impl DispatcherImpl { DispatcherImpl::Hash(HashDataDispatcher::new( outputs, - column_indices, + dist_key_indices, + output_indices, hash_mapping, dispatcher.dispatcher_id, )) } Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new( outputs, + output_indices, dispatcher.dispatcher_id, )), Simple | NoShuffle => { let [output]: [_; 1] = outputs.try_into().unwrap(); - DispatcherImpl::Simple(SimpleDispatcher::new(output, dispatcher.dispatcher_id)) + DispatcherImpl::Simple(SimpleDispatcher::new( + output, + output_indices, + dispatcher.dispatcher_id, + )) } Unspecified => unreachable!(), }; @@ -509,6 +521,7 @@ impl Dispatcher for RoundRobinDataDispatcher { pub struct HashDataDispatcher { outputs: Vec, keys: Vec, + output_indices: Vec, /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to /// different downstream actors. hash_mapping: ExpandedActorMapping, @@ -529,12 +542,14 @@ impl HashDataDispatcher { pub fn new( outputs: Vec, keys: Vec, + output_indices: Vec, hash_mapping: ExpandedActorMapping, dispatcher_id: DispatcherId, ) -> Self { Self { outputs, keys, + output_indices, hash_mapping, dispatcher_id, } @@ -592,6 +607,8 @@ impl Dispatcher for HashDataDispatcher { let mut last_vnode_when_update_delete = None; let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); + // Apply output indices after calculating the vnode. + let chunk = chunk.reorder_columns(&self.output_indices); // TODO: refactor with `Vis`. let (ops, columns, visibility) = chunk.into_inner(); @@ -690,16 +707,19 @@ impl Dispatcher for HashDataDispatcher { #[derive(Debug)] pub struct BroadcastDispatcher { outputs: HashMap, + output_indices: Vec, dispatcher_id: DispatcherId, } impl BroadcastDispatcher { pub fn new( outputs: impl IntoIterator, + output_indices: Vec, dispatcher_id: DispatcherId, ) -> Self { Self { outputs: Self::into_pairs(outputs).collect(), + output_indices, dispatcher_id, } } @@ -718,6 +738,7 @@ impl Dispatcher for BroadcastDispatcher { fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { async move { + let chunk = chunk.reorder_columns(&self.output_indices); for output in self.outputs.values_mut() { output.send(Message::Chunk(chunk.clone())).await?; } @@ -779,13 +800,19 @@ pub struct SimpleDispatcher { /// Therefore, when dispatching data, we assert that there's exactly one output by /// `Self::output`. output: SmallVec<[BoxedOutput; 2]>, + output_indices: Vec, dispatcher_id: DispatcherId, } impl SimpleDispatcher { - pub fn new(output: BoxedOutput, dispatcher_id: DispatcherId) -> Self { + pub fn new( + output: BoxedOutput, + output_indices: Vec, + dispatcher_id: DispatcherId, + ) -> Self { Self { output: smallvec![output], + output_indices, dispatcher_id, } } @@ -817,6 +844,7 @@ impl Dispatcher for SimpleDispatcher { .exactly_one() .expect("expect exactly one output"); + let chunk = chunk.reorder_columns(&self.output_indices); output.send(Message::Chunk(chunk)).await } } @@ -919,8 +947,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping, 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + vec![0, 1, 2], + hash_mapping, + 0, + ); let chunk = StreamChunk::from_pretty( " I I I @@ -1147,8 +1180,13 @@ mod tests { .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT / num_outputs]) .collect_vec(); hash_mapping.resize(VirtualNode::COUNT, num_outputs as u32); - let mut hash_dispatcher = - HashDataDispatcher::new(outputs, key_indices.to_vec(), hash_mapping.clone(), 0); + let mut hash_dispatcher = HashDataDispatcher::new( + outputs, + key_indices.to_vec(), + (0..dimension).collect(), + hash_mapping.clone(), + 0, + ); let mut ops = Vec::new(); for idx in 0..cardinality { diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index c2954d6cd2dd1..737defdae9ada 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use anyhow::anyhow; use async_stack_trace::{SpanValue, StackTrace}; use async_trait::async_trait; +use derivative::Derivative; use risingwave_common::util::addr::is_local_address; use tokio::sync::mpsc::error::SendError; @@ -44,22 +45,18 @@ pub trait Output: Debug + Send + Sync + 'static { pub type BoxedOutput = Box; /// `LocalOutput` sends data to a local channel. +#[derive(Derivative)] +#[derivative(Debug)] pub struct LocalOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for LocalOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LocalOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl LocalOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self { @@ -97,22 +94,18 @@ impl Output for LocalOutput { /// /// [`ExchangeService`]: risingwave_pb::task_service::exchange_service_server::ExchangeService // FIXME: can we just use the same `Output` with local and compacts it in gRPC server? +#[derive(Derivative)] +#[derivative(Debug)] pub struct RemoteOutput { actor_id: ActorId, + #[derivative(Debug = "ignore")] span: SpanValue, + #[derivative(Debug = "ignore")] ch: Sender, } -impl Debug for RemoteOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RemoteOutput") - .field("actor_id", &self.actor_id) - .finish() - } -} - impl RemoteOutput { pub fn new(actor_id: ActorId, ch: Sender) -> Self { Self {