Skip to content

Commit

Permalink
fix(sink): pass downstream pk when starting a sink (#8660)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored Mar 21, 2023
1 parent 8c5489e commit 59a0947
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++;
if ($1 == 1 && $2 == 50 && $3 == "1-50") c1++;
if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++;
if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++;
if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++;
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2')
statement ok
FLUSH;

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50');

statement ok
FLUSH;

statement ok
DROP SINK s6;

Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ impl ExecutorBuilder for SinkExecutorBuilder {
let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap());
let mut properties = sink_desc.get_properties().clone();
let pk_indices = sink_desc
.plan_pk
.downstream_pk
.iter()
.map(|pk| pk.column_index as usize)
.collect::<Vec<_>>();
.map(|i| *i as usize)
.collect_vec();
let schema = sink_desc.columns.iter().map(Into::into).collect();
// This field can be used to distinguish a specific actor in parallelism to prevent
// transaction execution errors
Expand Down

0 comments on commit 59a0947

Please sign in to comment.