From 51fccad86d2d4a686dc57136294c7f3907f941db Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Sat, 10 Jun 2023 07:46:56 +0000 Subject: [PATCH 1/3] prune out hidden columns within sink executor --- proto/stream_plan.proto | 4 +- src/connector/src/sink/catalog/desc.rs | 5 +- src/frontend/src/optimizer/mod.rs | 20 +---- src/stream/src/executor/sink.rs | 103 ++++++++++++++++++------- src/stream/src/from_proto/sink.rs | 10 ++- 5 files changed, 89 insertions(+), 53 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index d3c4304d500ff..4ca2348ac4070 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -145,15 +145,17 @@ message SourceNode { } message SinkDesc { + reserved 4; + reserved "columns"; uint32 id = 1; string name = 2; string definition = 3; - repeated plan_common.ColumnDesc columns = 4; repeated common.ColumnOrder plan_pk = 5; repeated uint32 downstream_pk = 6; repeated uint32 distribution_key = 7; map properties = 8; catalog.SinkType sink_type = 9; + repeated plan_common.ColumnCatalog column_catalogs = 10; } message SinkNode { diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 1ce57f69f52b3..5ace2746dcd19 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -19,7 +19,6 @@ use risingwave_common::catalog::{ ColumnCatalog, ConnectionId, DatabaseId, SchemaId, TableId, UserId, }; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::plan_common::PbColumnDesc; use risingwave_pb::stream_plan::PbSinkDesc; use super::{SinkCatalog, SinkId, SinkType}; @@ -89,10 +88,10 @@ impl SinkDesc { id: self.id.sink_id, name: self.name.clone(), definition: self.definition.clone(), - columns: self + column_catalogs: self .columns .iter() - .map(|column| Into::::into(&column.column_desc)) + .map(|column| column.to_protobuf()) .collect_vec(), plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(), downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(), diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f6480f3e5efa5..632f03fea61ef 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -54,7 +54,6 @@ use self::plan_visitor::InputRefValidator; use self::property::RequiredDist; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; -use crate::expr::InputRef; use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, @@ -519,24 +518,7 @@ impl PlanRoot { definition: String, properties: WithOptions, ) -> Result { - let mut stream_plan = self.gen_optimized_stream_plan(false)?; - - // Add a project node if there is hidden column(s). - let input_fields = stream_plan.schema().fields(); - if input_fields.len() != self.out_fields.count_ones(..) { - let exprs = input_fields - .iter() - .enumerate() - .filter_map(|(idx, field)| { - if self.out_fields.contains(idx) { - Some(InputRef::new(idx, field.data_type.clone()).into()) - } else { - None - } - }) - .collect_vec(); - stream_plan = StreamProject::new(generic::Project::new(exprs, stream_plan)).into(); - } + let stream_plan = self.gen_optimized_stream_plan(false)?; StreamSink::create( stream_plan, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index b92a204b8238a..fcc47f8a95dbc 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -18,9 +18,10 @@ use std::time::Instant; use futures::stream::select; use futures::{FutureExt, StreamExt}; use futures_async_stream::try_stream; +use itertools::Itertools; use prometheus::Histogram; use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{ColumnCatalog, Schema}; use risingwave_common::row::Row; use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -40,6 +41,7 @@ pub struct SinkExecutor { sink: SinkImpl, config: SinkConfig, identity: String, + columns: Vec, schema: Schema, pk_indices: Vec, sink_type: SinkType, @@ -89,12 +91,12 @@ fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option SinkExecutor { #[allow(clippy::too_many_arguments)] pub async fn new( - materialize_executor: BoxedExecutor, + input: BoxedExecutor, metrics: Arc, config: SinkConfig, executor_id: u64, connector_params: ConnectorParams, - schema: Schema, + columns: Vec, pk_indices: Vec, sink_type: SinkType, sink_id: u64, @@ -102,6 +104,10 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let (log_reader, log_writer) = log_store_factory.build().await; + let schema: Schema = columns + .iter() + .map(|column| column.column_desc.clone().into()) + .collect(); let sink = build_sink( config.clone(), schema.clone(), @@ -112,11 +118,12 @@ impl SinkExecutor { ) .await?; Ok(Self { - input: materialize_executor, + input, metrics, sink, config, identity: format!("SinkExecutor {:X?}", executor_id), + columns, schema, sink_type, pk_indices, @@ -140,6 +147,7 @@ impl SinkExecutor { self.input, self.log_writer, self.schema, + self.columns, self.sink_type, self.actor_context, ); @@ -155,6 +163,7 @@ impl SinkExecutor { input: BoxedExecutor, mut log_writer: impl LogWriter, schema: Schema, + columns: Vec, sink_type: SinkType, actor_context: ActorContextRef, ) { @@ -170,6 +179,12 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); + let visible_columns = columns + .iter() + .enumerate() + .filter_map(|(idx, column)| (!column.is_hidden).then(|| idx)) + .collect_vec(); + #[for_await] for msg in input { match msg? { @@ -184,7 +199,12 @@ impl SinkExecutor { Some(chunk.clone().compact()) }; - if let Some(chunk) = visible_chunk { + if let Some(mut chunk) = visible_chunk { + if visible_columns.len() != columns.len() { + // Do projection here because we may have columns that aren't visible to + // the downstream. + chunk = chunk.reorder_columns(&visible_columns); + } // NOTE: We start the txn here because a force-append-only sink might // receive a data chunk full of DELETE messages and then drop all of them. // At this point (instead of the point above when we receive the upstream @@ -312,7 +332,7 @@ impl Executor for SinkExecutor { } fn schema(&self) -> &Schema { - self.input.schema() + &self.schema } fn pk_indices(&self) -> super::PkIndicesRef<'_> { @@ -326,6 +346,8 @@ impl Executor for SinkExecutor { #[cfg(test)] mod test { + use risingwave_common::catalog::{ColumnDesc, ColumnId}; + use super::*; use crate::common::log_store::BoundedInMemLogStoreFactory; use crate::executor::test_utils::*; @@ -335,7 +357,6 @@ mod test { async fn test_force_append_only_sink() { use risingwave_common::array::stream_chunk::StreamChunk; use risingwave_common::array::StreamChunkTestExt; - use risingwave_common::catalog::Field; use risingwave_common::types::DataType; use crate::executor::Barrier; @@ -345,31 +366,48 @@ mod test { "type".into() => "append-only".into(), "force_append_only".into() => "true".into() }; - let schema = Schema::new(vec![ - Field::with_name(DataType::Int64, "v1"), - Field::with_name(DataType::Int64, "v2"), - ]); + + // We have two visible columns and one hidden column. The hidden column will be pruned out + // within the sink executor. + let columns = vec![ + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64), + is_hidden: true, + }, + ]; + let schema: Schema = columns + .iter() + .map(|column| column.column_desc.clone().into()) + .collect(); let pk = vec![0]; let mock = MockSource::with_messages( - schema.clone(), + schema, pk.clone(), vec![ Message::Barrier(Barrier::new_test_barrier(1)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( - " I I - + 3 2", + " I I I + + 3 2 1", ))), Message::Barrier(Barrier::new_test_barrier(2)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( - " I I - U- 3 2 - U+ 3 4 - + 5 6", + " I I I + U- 3 2 1 + U+ 3 4 1 + + 5 6 7", ))), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( - " I I - - 5 6", + " I I I + - 5 6 7", ))), ], ); @@ -381,7 +419,7 @@ mod test { config, 0, Default::default(), - schema.clone(), + columns.clone(), pk.clone(), SinkType::ForceAppendOnly, 0, @@ -427,7 +465,6 @@ mod test { #[tokio::test] async fn test_empty_barrier_sink() { - use risingwave_common::catalog::Field; use risingwave_common::types::DataType; use crate::executor::Barrier; @@ -437,14 +474,24 @@ mod test { "type".into() => "append-only".into(), "force_append_only".into() => "true".into() }; - let schema = Schema::new(vec![ - Field::with_name(DataType::Int64, "v1"), - Field::with_name(DataType::Int64, "v2"), - ]); + let columns = vec![ + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64), + is_hidden: false, + }, + ]; + let schema: Schema = columns + .iter() + .map(|column| column.column_desc.clone().into()) + .collect(); let pk = vec![0]; let mock = MockSource::with_messages( - schema.clone(), + schema, pk.clone(), vec![ Message::Barrier(Barrier::new_test_barrier(1)), @@ -460,7 +507,7 @@ mod test { config, 0, Default::default(), - schema.clone(), + columns, pk.clone(), SinkType::ForceAppendOnly, 0, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 7da9ee68b4d31..3ba0c3a6fceaa 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::ColumnCatalog; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{SinkConfig, DOWNSTREAM_SINK_KEY}; @@ -44,7 +45,12 @@ impl ExecutorBuilder for SinkExecutorBuilder { .iter() .map(|i| *i as usize) .collect_vec(); - let schema = sink_desc.columns.iter().map(Into::into).collect(); + let columns = sink_desc + .column_catalogs + .clone() + .into_iter() + .map(ColumnCatalog::from) + .collect_vec(); // This field can be used to distinguish a specific actor in parallelism to prevent // transaction execution errors if let Some(connector) = properties.get(DOWNSTREAM_SINK_KEY) && connector == KAFKA_SINK { @@ -62,7 +68,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { config, params.executor_id, params.env.connector_params(), - schema, + columns, pk_indices, sink_type, sink_id, From 17eb73cee4d4bfc6aba5a003279dd553677153fa Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 15 Jun 2023 06:23:13 +0000 Subject: [PATCH 2/3] fix test --- .../tests/testdata/output/nexmark.yaml | 54 +++++++++---------- .../src/optimizer/plan_node/stream_sink.rs | 2 +- src/stream/src/executor/sink.rs | 31 ++++++----- 3 files changed, 40 insertions(+), 47 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 751a198a2c708..a4fd1983f2aba 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -43,10 +43,9 @@ BatchExchange { order: [], dist: Single } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, bidder, price, date_time] } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time] } - └─StreamExchange { dist: Single } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } @@ -89,11 +88,10 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, bidder, price, date_time] } - └─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, bid.date_time] } - └─StreamExchange { dist: Single } - └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] } @@ -132,11 +130,10 @@ └─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } └─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, price] } - └─StreamProject { exprs: [bid.auction, bid.price] } - └─StreamExchange { dist: Single } - └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } @@ -820,11 +817,10 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time] } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, $expr1, $expr2] } - └─StreamExchange { dist: Single } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] } @@ -927,12 +923,11 @@ └─BatchFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra] } - └─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, $expr2, bid.date_time, bid.extra] } - └─StreamExchange { dist: Single } - └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } - └─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } + └─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] } @@ -1385,11 +1380,10 @@ └─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3] } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url], distribution: SomeShard } sink_plan: | - StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3] } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, $expr1, $expr2, $expr3] } - └─StreamExchange { dist: Single } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] } + └─StreamExchange { dist: Single } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: | StreamMaterialize { columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index a4ca14e660aab..d159abf709887 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -269,7 +269,7 @@ impl fmt::Display for StreamSink { .sink_desc .columns .iter() - .map(|col| col.column_desc.name.clone()) + .map(|col| col.name_with_hidden()) .collect_vec() .join(", "); builder diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index fcc47f8a95dbc..7829f49609cf0 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -182,7 +182,7 @@ impl SinkExecutor { let visible_columns = columns .iter() .enumerate() - .filter_map(|(idx, column)| (!column.is_hidden).then(|| idx)) + .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx)) .collect_vec(); #[for_await] @@ -199,19 +199,18 @@ impl SinkExecutor { Some(chunk.clone().compact()) }; - if let Some(mut chunk) = visible_chunk { - if visible_columns.len() != columns.len() { + if let Some(chunk) = visible_chunk { + let chunk_to_connector = if visible_columns.len() != columns.len() { // Do projection here because we may have columns that aren't visible to // the downstream. - chunk = chunk.reorder_columns(&visible_columns); - } - // NOTE: We start the txn here because a force-append-only sink might - // receive a data chunk full of DELETE messages and then drop all of them. - // At this point (instead of the point above when we receive the upstream - // data chunk), we make sure that we do have data to send out, and we can - // thus mark the txn as started. - log_writer.write_chunk(chunk.clone()).await?; + chunk.clone().reorder_columns(&visible_columns) + } else { + chunk.clone() + }; + + log_writer.write_chunk(chunk_to_connector).await?; + // Use original chunk instead of the reordered one as the executor output. yield Message::Chunk(chunk); } } @@ -438,8 +437,8 @@ mod test { assert_eq!( chunk_msg.into_chunk().unwrap(), StreamChunk::from_pretty( - " I I - + 3 2", + " I I I + + 3 2 1", ) ); @@ -450,9 +449,9 @@ mod test { assert_eq!( chunk_msg.into_chunk().unwrap(), StreamChunk::from_pretty( - " I I - + 3 4 - + 5 6", + " I I I + + 3 4 1 + + 5 6 7", ) ); From a5cfbb2903a57674e91fdb584fe9785fea4db999 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 15 Jun 2023 07:00:31 +0000 Subject: [PATCH 3/3] pass schema without hidden columns to sink connector --- src/stream/src/executor/sink.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 7829f49609cf0..a5f5756cd6442 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -56,12 +56,17 @@ struct SinkMetrics { async fn build_sink( config: SinkConfig, - schema: Schema, + columns: &[ColumnCatalog], pk_indices: PkIndices, connector_params: ConnectorParams, sink_type: SinkType, sink_id: u64, ) -> StreamExecutorResult { + // The downstream sink can only see the visible columns. + let schema: Schema = columns + .iter() + .filter_map(|column| (!column.is_hidden).then(|| column.column_desc.clone().into())) + .collect(); Ok(SinkImpl::new( config, schema, @@ -104,19 +109,19 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let (log_reader, log_writer) = log_store_factory.build().await; - let schema: Schema = columns - .iter() - .map(|column| column.column_desc.clone().into()) - .collect(); let sink = build_sink( config.clone(), - schema.clone(), + &columns, pk_indices.clone(), connector_params, sink_type, sink_id, ) .await?; + let schema: Schema = columns + .iter() + .map(|column| column.column_desc.clone().into()) + .collect(); Ok(Self { input, metrics,