From 59a09479567121baee2a122c32f7b6ddecafe045 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Tue, 21 Mar 2023 11:17:54 +0800 Subject: [PATCH] fix(sink): pass downstream pk when starting a sink (#8660) --- ci/scripts/e2e-iceberg-sink-test.sh | 2 +- e2e_test/sink/iceberg_sink.slt | 6 ++++++ src/stream/src/from_proto/sink.rs | 6 +++--- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/ci/scripts/e2e-iceberg-sink-test.sh b/ci/scripts/e2e-iceberg-sink-test.sh index c3d40ff7cd80..99b417c0c4f8 100755 --- a/ci/scripts/e2e-iceberg-sink-test.sh +++ b/ci/scripts/e2e-iceberg-sink-test.sh @@ -86,7 +86,7 @@ spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ # check sink destination using shell if cat ./spark-output/*.csv | sort | awk -F "," '{ -if ($1 == 1 && $2 == 2 && $3 == "1-2") c1++; +if ($1 == 1 && $2 == 50 && $3 == "1-50") c1++; if ($1 == 13 && $2 == 2 && $3 == "13-2") c2++; if ($1 == 21 && $2 == 2 && $3 == "21-2") c3++; if ($1 == 2 && $2 == 2 && $3 == "2-2") c4++; diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index c0bcb3b62559..2a214c0710dd 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -23,6 +23,12 @@ INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2') statement ok FLUSH; +statement ok +INSERT INTO t6 VALUES (1, 50, '1-50'); + +statement ok +FLUSH; + statement ok DROP SINK s6; diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index e524963187f1..f200d7211dc1 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -37,10 +37,10 @@ impl ExecutorBuilder for SinkExecutorBuilder { let sink_type = SinkType::from_proto(sink_desc.get_sink_type().unwrap()); let mut properties = sink_desc.get_properties().clone(); let pk_indices = sink_desc - .plan_pk + .downstream_pk .iter() - .map(|pk| pk.column_index as usize) - .collect::>(); + .map(|i| *i as usize) + .collect_vec(); let schema = sink_desc.columns.iter().map(Into::into).collect(); // This field can be used to distinguish a specific actor in parallelism to prevent // transaction execution errors