Skip to content

Commit

Permalink
pass schema without hidden columns to sink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx committed Jun 15, 2023
1 parent 17eb73c commit a5cfbb2
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ struct SinkMetrics {

async fn build_sink(
config: SinkConfig,
schema: Schema,
columns: &[ColumnCatalog],
pk_indices: PkIndices,
connector_params: ConnectorParams,
sink_type: SinkType,
sink_id: u64,
) -> StreamExecutorResult<SinkImpl> {
// The downstream sink can only see the visible columns.
let schema: Schema = columns
.iter()
.filter_map(|column| (!column.is_hidden).then(|| column.column_desc.clone().into()))
.collect();
Ok(SinkImpl::new(
config,
schema,
Expand Down Expand Up @@ -104,19 +109,19 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
log_store_factory: F,
) -> StreamExecutorResult<Self> {
let (log_reader, log_writer) = log_store_factory.build().await;
let schema: Schema = columns
.iter()
.map(|column| column.column_desc.clone().into())
.collect();
let sink = build_sink(
config.clone(),
schema.clone(),
&columns,
pk_indices.clone(),
connector_params,
sink_type,
sink_id,
)
.await?;
let schema: Schema = columns
.iter()
.map(|column| column.column_desc.clone().into())
.collect();
Ok(Self {
input,
metrics,
Expand Down

0 comments on commit a5cfbb2

Please sign in to comment.