From 3e02b79e1458a2daa7defe3eccac2082b2e1b8eb Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 23 Sep 2024 16:33:48 +0800 Subject: [PATCH 01/13] feat: allow append only table with pk --- src/frontend/src/handler/create_table.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index e7b2b44226657..6112e1bdc4943 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -699,12 +699,20 @@ fn gen_table_plan_inner( vec![], ); - if append_only && row_id_index.is_none() { - return Err(ErrorCode::InvalidInputSyntax( - "PRIMARY KEY constraint can not be applied to an append-only table.".to_owned(), - ) - .into()); - } + let pk_on_append_only = append_only && row_id_index.is_none(); + + let on_conflict = if pk_on_append_only { + let on_conflict = on_conflict.unwrap_or(OnConflict::Ignore); + if on_conflict != OnConfict::Ignore { + return Err(ErrorCode::InvalidInputSyntax( + "When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be IGNORE.".to_owned(), + ) + .into()); + } + Some(on_conflict) + } else { + on_conflict + }; if !append_only && !watermark_descs.is_empty() { return Err(ErrorCode::NotSupported( From 84888dded90572d052dd6321a1d1d1cc759fbbb0 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 23 Sep 2024 16:36:03 +0800 Subject: [PATCH 02/13] add test & fix typo --- e2e_test/streaming/on_conflict.slt | 5 ++++- src/frontend/src/handler/create_table.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/e2e_test/streaming/on_conflict.slt b/e2e_test/streaming/on_conflict.slt index 66327571a8761..8aaa4907b5f8f 100644 --- a/e2e_test/streaming/on_conflict.slt +++ b/e2e_test/streaming/on_conflict.slt @@ -2,7 +2,7 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; statement ok -create table t1 (v1 int, v2 int, v3 int, primary key(v1)) on conflict ignore; +create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLYon conflict ignore; statement ok insert into t1 values (1,4,2), (2,3,3); @@ -26,6 +26,9 @@ select v1, v2, v3 from mv1; statement ok SET RW_IMPLICIT_FLUSH TO true; +statement error +create table t2 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict overwrite; + statement ok create table t2 (v1 int, v2 int, v3 int, primary key(v1)) on conflict overwrite; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6112e1bdc4943..620206898cf97 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -703,7 +703,7 @@ fn gen_table_plan_inner( let on_conflict = if pk_on_append_only { let on_conflict = on_conflict.unwrap_or(OnConflict::Ignore); - if on_conflict != OnConfict::Ignore { + if on_conflict != OnConflict::Ignore { return Err(ErrorCode::InvalidInputSyntax( "When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be IGNORE.".to_owned(), ) From 41f925a3f9b5e5919b59ab4dea8502b2c00d5366 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 23 Sep 2024 16:40:44 +0800 Subject: [PATCH 03/13] fix --- src/frontend/src/optimizer/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de5c3deaf0d6b..a76c79880e9bb 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -703,8 +703,7 @@ impl PlanRoot { Ok(dml_node) } - let kind = if append_only { - assert!(row_id_index.is_some()); + let kind = if row_id_index.is_none() { PrimaryKeyKind::AppendOnly } else if let Some(row_id_index) = row_id_index { assert_eq!( From f5be1713fa4face6c0e19b5fe4d9cd996822311f Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 23 Sep 2024 16:41:16 +0800 Subject: [PATCH 04/13] fix --- src/frontend/src/optimizer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a76c79880e9bb..dc2d4859c9509 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -703,7 +703,7 @@ impl PlanRoot { Ok(dml_node) } - let kind = if row_id_index.is_none() { + let kind = if row_id_index.is_some() { PrimaryKeyKind::AppendOnly } else if let Some(row_id_index) = row_id_index { assert_eq!( From 5581200ea86855034c1e2b2e81494f939b4432ef Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 15:38:15 +0800 Subject: [PATCH 05/13] fix test --- e2e_test/streaming/on_conflict.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/streaming/on_conflict.slt b/e2e_test/streaming/on_conflict.slt index 8aaa4907b5f8f..60da74d75956c 100644 --- a/e2e_test/streaming/on_conflict.slt +++ b/e2e_test/streaming/on_conflict.slt @@ -2,7 +2,7 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; statement ok -create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLYon conflict ignore; +create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict ignore; statement ok insert into t1 values (1,4,2), (2,3,3); From 060e2f7f3804b57d872b6a97e8964f5551f1a0f7 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 15:40:22 +0800 Subject: [PATCH 06/13] revert --- src/frontend/src/optimizer/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index dc2d4859c9509..de5c3deaf0d6b 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -703,7 +703,8 @@ impl PlanRoot { Ok(dml_node) } - let kind = if row_id_index.is_some() { + let kind = if append_only { + assert!(row_id_index.is_some()); PrimaryKeyKind::AppendOnly } else if let Some(row_id_index) = row_id_index { assert_eq!( From 08c3c25ec9d9f1c22a366f469e67d013da14b6ba Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 15:46:28 +0800 Subject: [PATCH 07/13] rename --- src/frontend/src/optimizer/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de5c3deaf0d6b..7accbf11525b3 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -675,8 +675,8 @@ impl PlanRoot { #[derive(PartialEq, Debug, Copy, Clone)] enum PrimaryKeyKind { UserDefinedPrimaryKey, - RowIdAsPrimaryKey, - AppendOnly, + NonAppendOnlyRowIdPK, + AppendOnlyRowIdPK, } fn inject_dml_node( @@ -693,11 +693,11 @@ impl PlanRoot { dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?; dml_node = match kind { - PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => { + PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPK => { RequiredDist::hash_shard(pk_column_indices) .enforce_if_not_satisfies(dml_node, &Order::any())? } - PrimaryKeyKind::AppendOnly => StreamExchange::new_no_shuffle(dml_node).into(), + PrimaryKeyKind::AppendOnlyRowIdPK => StreamExchange::new_no_shuffle(dml_node).into(), }; Ok(dml_node) @@ -705,13 +705,13 @@ impl PlanRoot { let kind = if append_only { assert!(row_id_index.is_some()); - PrimaryKeyKind::AppendOnly + PrimaryKeyKind::AppendOnlyRowIdPK } else if let Some(row_id_index) = row_id_index { assert_eq!( pk_column_indices.iter().exactly_one().copied().unwrap(), row_id_index ); - PrimaryKeyKind::RowIdAsPrimaryKey + PrimaryKeyKind::NonAppendOnlyRowIdPK } else { PrimaryKeyKind::UserDefinedPrimaryKey }; @@ -738,7 +738,7 @@ impl PlanRoot { .enforce_if_not_satisfies(external_source_node, &Order::any())? } - PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { + PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => { StreamExchange::new_no_shuffle(external_source_node).into() } }; @@ -814,7 +814,7 @@ impl PlanRoot { PrimaryKeyKind::UserDefinedPrimaryKey => { unreachable!() } - PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { + PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => { stream_plan = StreamRowIdGen::new_with_dist( stream_plan, row_id_index, From 424ef7a0598b6cfaf8fee8b784412d8ec055a64b Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 15:55:32 +0800 Subject: [PATCH 08/13] add test --- .../tests/testdata/input/explain.yaml | 8 +++++++ .../tests/testdata/output/explain.yaml | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/frontend/planner_test/tests/testdata/input/explain.yaml b/src/frontend/planner_test/tests/testdata/input/explain.yaml index fecb3d4235108..bc64dcc383281 100644 --- a/src/frontend/planner_test/tests/testdata/input/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/input/explain.yaml @@ -25,3 +25,11 @@ explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; expected_outputs: - explain_output +- sql: | + explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: + - explain_output +- sql: | + explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: + - explain_output \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index e88ce0b9caa68..3a612a2a54707 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -160,3 +160,24 @@ └─StreamExchange { dist: HashShard(_row_id) } └─StreamDml { columns: [v1, v2, _row_id] } └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } + └─StreamRowIdGen { row_id_index: 2 } + └─StreamUnion { all: true } + ├─StreamExchange [no_shuffle] { dist: SomeShard } + │ └─StreamSource { source: t, columns: [v1, v2, _row_id] } + └─StreamExchange [no_shuffle] { dist: SomeShard } + └─StreamDml { columns: [v1, v2, _row_id] } + └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: Overwrite } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(v2) } + │ └─StreamSource { source: t, columns: [v1, v2] } + └─StreamExchange { dist: HashShard(v2) } + └─StreamDml { columns: [v1, v2] } + └─StreamSource From 35a3f79f149a90b39a7dbfc1d1567647cc16d452 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 15:59:35 +0800 Subject: [PATCH 09/13] fix dist --- .../tests/testdata/input/explain.yaml | 4 ++++ .../tests/testdata/output/explain.yaml | 10 ++++++++++ src/frontend/src/optimizer/mod.rs | 15 +++++++++------ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/explain.yaml b/src/frontend/planner_test/tests/testdata/input/explain.yaml index bc64dcc383281..ec1b2086ef645 100644 --- a/src/frontend/planner_test/tests/testdata/input/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/input/explain.yaml @@ -32,4 +32,8 @@ - sql: | explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; expected_outputs: + - explain_output +- sql: | + explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: - explain_output \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index 3a612a2a54707..41ab32c3910ae 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -181,3 +181,13 @@ └─StreamExchange { dist: HashShard(v2) } └─StreamDml { columns: [v1, v2] } └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: IgnoreConflict } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(v2) } + │ └─StreamSource { source: t, columns: [v1, v2] } + └─StreamExchange { dist: HashShard(v2) } + └─StreamDml { columns: [v1, v2] } + └─StreamSource diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 7accbf11525b3..2958ea97fe939 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -697,21 +697,24 @@ impl PlanRoot { RequiredDist::hash_shard(pk_column_indices) .enforce_if_not_satisfies(dml_node, &Order::any())? } - PrimaryKeyKind::AppendOnlyRowIdPK => StreamExchange::new_no_shuffle(dml_node).into(), + PrimaryKeyKind::AppendOnlyRowIdPK => { + StreamExchange::new_no_shuffle(dml_node).into() + } }; Ok(dml_node) } - let kind = if append_only { - assert!(row_id_index.is_some()); - PrimaryKeyKind::AppendOnlyRowIdPK - } else if let Some(row_id_index) = row_id_index { + let kind = if let Some(row_id_index) = row_id_index { assert_eq!( pk_column_indices.iter().exactly_one().copied().unwrap(), row_id_index ); - PrimaryKeyKind::NonAppendOnlyRowIdPK + if append_only { + PrimaryKeyKind::AppendOnlyRowIdPK + } else { + PrimaryKeyKind::NonAppendOnlyRowIdPK + } } else { PrimaryKeyKind::UserDefinedPrimaryKey }; From 7fd27d4e6d3f8da7291938f20e588c1f1be71eec Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 16:23:47 +0800 Subject: [PATCH 10/13] make clippy happy --- dashboard/next-env.d.ts | 2 +- src/frontend/src/optimizer/mod.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dashboard/next-env.d.ts b/dashboard/next-env.d.ts index 4f11a03dc6cc3..a4a7b3f5cfa2f 100644 --- a/dashboard/next-env.d.ts +++ b/dashboard/next-env.d.ts @@ -2,4 +2,4 @@ /// // NOTE: This file should not be edited -// see https://nextjs.org/docs/basic-features/typescript for more information. +// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information. diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 2958ea97fe939..83ae27a501a09 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -675,8 +675,8 @@ impl PlanRoot { #[derive(PartialEq, Debug, Copy, Clone)] enum PrimaryKeyKind { UserDefinedPrimaryKey, - NonAppendOnlyRowIdPK, - AppendOnlyRowIdPK, + NonAppendOnlyRowIdPk, + AppendOnlyRowIdPk, } fn inject_dml_node( @@ -693,11 +693,11 @@ impl PlanRoot { dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?; dml_node = match kind { - PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPK => { + PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => { RequiredDist::hash_shard(pk_column_indices) .enforce_if_not_satisfies(dml_node, &Order::any())? } - PrimaryKeyKind::AppendOnlyRowIdPK => { + PrimaryKeyKind::AppendOnlyRowIdPk => { StreamExchange::new_no_shuffle(dml_node).into() } }; @@ -711,9 +711,9 @@ impl PlanRoot { row_id_index ); if append_only { - PrimaryKeyKind::AppendOnlyRowIdPK + PrimaryKeyKind::AppendOnlyRowIdPk } else { - PrimaryKeyKind::NonAppendOnlyRowIdPK + PrimaryKeyKind::NonAppendOnlyRowIdPk } } else { PrimaryKeyKind::UserDefinedPrimaryKey @@ -741,7 +741,7 @@ impl PlanRoot { .enforce_if_not_satisfies(external_source_node, &Order::any())? } - PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => { + PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => { StreamExchange::new_no_shuffle(external_source_node).into() } }; @@ -817,7 +817,7 @@ impl PlanRoot { PrimaryKeyKind::UserDefinedPrimaryKey => { unreachable!() } - PrimaryKeyKind::NonAppendOnlyRowIdPK | PrimaryKeyKind::AppendOnlyRowIdPK => { + PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => { stream_plan = StreamRowIdGen::new_with_dist( stream_plan, row_id_index, From ead91a0a3e0f2872c4d4a5d779486bad20c2dca5 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 17:08:26 +0800 Subject: [PATCH 11/13] change sink into table --- e2e_test/sink/sink_into_table/basic.slt | 67 +++++++++++++++++++++++++ src/frontend/src/handler/create_sink.rs | 18 ++++--- 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index e2a10d46fbf37..8e93672c083ae 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -323,6 +323,73 @@ drop table t_primary_key; statement ok drop table t_s3; + + +# target table append only with primary key + +statement ok +create table t_s3 (v1 int, v2 int) append only; + +statement ok +insert into t_s3 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_primary_key_append_only (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2) APPEND ONLY; + +statement error +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3; + +statement ok +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3 with (type = 'append-only'); + + +statement ok +flush; + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s3 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +insert into t_primary_key_append_only values (100, 100); + +query IIII +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 +100 100 1000 200 + +statement ok +drop sink s3; + +statement ok +drop table t_primary_key_append_only; + +statement ok +drop table t_s3; + + # multi sinks statement ok diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f87fced6b02a2..7486046701a81 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -277,15 +277,19 @@ pub async fn gen_sink_plan( } } - let user_defined_primary_key_table = - !(table_catalog.append_only || table_catalog.row_id_index.is_some()); + let user_defined_primary_key_table = !table_catalog.row_id_index.is_some(); + let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly + || sink_catalog.sink_type == SinkType::ForceAppendOnly; - if !(user_defined_primary_key_table - || sink_catalog.sink_type == SinkType::AppendOnly - || sink_catalog.sink_type == SinkType::ForceAppendOnly) - { + if !user_defined_primary_key_table && !sink_is_append_only { return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys.".to_string(), + "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if table_catalog.append_only && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), ))); } From 7056e2db67783f9bfc778faa1144870c39791bb2 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 17:10:35 +0800 Subject: [PATCH 12/13] fix --- dashboard/next-env.d.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dashboard/next-env.d.ts b/dashboard/next-env.d.ts index a4a7b3f5cfa2f..4f11a03dc6cc3 100644 --- a/dashboard/next-env.d.ts +++ b/dashboard/next-env.d.ts @@ -2,4 +2,4 @@ /// // NOTE: This file should not be edited -// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information. +// see https://nextjs.org/docs/basic-features/typescript for more information. From f0d73ad20db42b106e4b214bd8466b4ebd7c7db6 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Wed, 25 Sep 2024 12:11:09 +0800 Subject: [PATCH 13/13] fix --- dashboard/next-env.d.ts | 2 +- src/frontend/src/handler/create_sink.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dashboard/next-env.d.ts b/dashboard/next-env.d.ts index 4f11a03dc6cc3..a4a7b3f5cfa2f 100644 --- a/dashboard/next-env.d.ts +++ b/dashboard/next-env.d.ts @@ -2,4 +2,4 @@ /// // NOTE: This file should not be edited -// see https://nextjs.org/docs/basic-features/typescript for more information. +// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information. diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 7486046701a81..1bbdfccdd4034 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -277,7 +277,7 @@ pub async fn gen_sink_plan( } } - let user_defined_primary_key_table = !table_catalog.row_id_index.is_some(); + let user_defined_primary_key_table = table_catalog.row_id_index.is_none(); let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly || sink_catalog.sink_type == SinkType::ForceAppendOnly;