From a5cfbb2903a57674e91fdb584fe9785fea4db999 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 15 Jun 2023 07:00:31 +0000 Subject: [PATCH] pass schema without hidden columns to sink connector --- src/stream/src/executor/sink.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 7829f49609cf..a5f5756cd644 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -56,12 +56,17 @@ struct SinkMetrics { async fn build_sink( config: SinkConfig, - schema: Schema, + columns: &[ColumnCatalog], pk_indices: PkIndices, connector_params: ConnectorParams, sink_type: SinkType, sink_id: u64, ) -> StreamExecutorResult { + // The downstream sink can only see the visible columns. + let schema: Schema = columns + .iter() + .filter_map(|column| (!column.is_hidden).then(|| column.column_desc.clone().into())) + .collect(); Ok(SinkImpl::new( config, schema, @@ -104,19 +109,19 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let (log_reader, log_writer) = log_store_factory.build().await; - let schema: Schema = columns - .iter() - .map(|column| column.column_desc.clone().into()) - .collect(); let sink = build_sink( config.clone(), - schema.clone(), + &columns, pk_indices.clone(), connector_params, sink_type, sink_id, ) .await?; + let schema: Schema = columns + .iter() + .map(|column| column.column_desc.clone().into()) + .collect(); Ok(Self { input, metrics,