diff --git a/dashboard/next-env.d.ts b/dashboard/next-env.d.ts index 4f11a03dc6cc3..a4a7b3f5cfa2f 100644 --- a/dashboard/next-env.d.ts +++ b/dashboard/next-env.d.ts @@ -2,4 +2,4 @@ /// // NOTE: This file should not be edited -// see https://nextjs.org/docs/basic-features/typescript for more information. +// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information. diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index e2a10d46fbf37..8e93672c083ae 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -323,6 +323,73 @@ drop table t_primary_key; statement ok drop table t_s3; + + +# target table append only with primary key + +statement ok +create table t_s3 (v1 int, v2 int) append only; + +statement ok +insert into t_s3 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_primary_key_append_only (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2) APPEND ONLY; + +statement error +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3; + +statement ok +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3 with (type = 'append-only'); + + +statement ok +flush; + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s3 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +insert into t_primary_key_append_only values (100, 100); + +query IIII +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 +100 100 1000 200 + +statement ok +drop sink s3; + +statement ok +drop table t_primary_key_append_only; + +statement ok +drop table t_s3; + + # multi sinks statement ok diff --git a/e2e_test/streaming/on_conflict.slt b/e2e_test/streaming/on_conflict.slt index 66327571a8761..60da74d75956c 100644 --- a/e2e_test/streaming/on_conflict.slt +++ b/e2e_test/streaming/on_conflict.slt @@ -2,7 +2,7 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; statement ok -create table t1 (v1 int, v2 int, v3 int, primary key(v1)) on conflict ignore; +create table t1 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict ignore; statement ok insert into t1 values (1,4,2), (2,3,3); @@ -26,6 +26,9 @@ select v1, v2, v3 from mv1; statement ok SET RW_IMPLICIT_FLUSH TO true; +statement error +create table t2 (v1 int, v2 int, v3 int, primary key(v1)) APPEND ONLY on conflict overwrite; + statement ok create table t2 (v1 int, v2 int, v3 int, primary key(v1)) on conflict overwrite; diff --git a/src/frontend/planner_test/tests/testdata/input/explain.yaml b/src/frontend/planner_test/tests/testdata/input/explain.yaml index fecb3d4235108..ec1b2086ef645 100644 --- a/src/frontend/planner_test/tests/testdata/input/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/input/explain.yaml @@ -25,3 +25,15 @@ explain create table t (v1 int, v2 varchar) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; expected_outputs: - explain_output +- sql: | + explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: + - explain_output +- sql: | + explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: + - explain_output +- sql: | + explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + expected_outputs: + - explain_output \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index e88ce0b9caa68..41ab32c3910ae 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -160,3 +160,34 @@ └─StreamExchange { dist: HashShard(_row_id) } └─StreamDml { columns: [v1, v2, _row_id] } └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } + └─StreamRowIdGen { row_id_index: 2 } + └─StreamUnion { all: true } + ├─StreamExchange [no_shuffle] { dist: SomeShard } + │ └─StreamSource { source: t, columns: [v1, v2, _row_id] } + └─StreamExchange [no_shuffle] { dist: SomeShard } + └─StreamDml { columns: [v1, v2, _row_id] } + └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar primary key) with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: Overwrite } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(v2) } + │ └─StreamSource { source: t, columns: [v1, v2] } + └─StreamExchange { dist: HashShard(v2) } + └─StreamDml { columns: [v1, v2] } + └─StreamSource +- sql: | + explain create table t (v1 int, v2 varchar primary key) append only with ( connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest' ) FORMAT PLAIN ENCODE JSON; + explain_output: | + StreamMaterialize { columns: [v1, v2], stream_key: [v2], pk_columns: [v2], pk_conflict: IgnoreConflict } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(v2) } + │ └─StreamSource { source: t, columns: [v1, v2] } + └─StreamExchange { dist: HashShard(v2) } + └─StreamDml { columns: [v1, v2] } + └─StreamSource diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f87fced6b02a2..1bbdfccdd4034 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -277,15 +277,19 @@ pub async fn gen_sink_plan( } } - let user_defined_primary_key_table = - !(table_catalog.append_only || table_catalog.row_id_index.is_some()); + let user_defined_primary_key_table = table_catalog.row_id_index.is_none(); + let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly + || sink_catalog.sink_type == SinkType::ForceAppendOnly; - if !(user_defined_primary_key_table - || sink_catalog.sink_type == SinkType::AppendOnly - || sink_catalog.sink_type == SinkType::ForceAppendOnly) - { + if !user_defined_primary_key_table && !sink_is_append_only { return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys.".to_string(), + "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if table_catalog.append_only && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), ))); } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index e7b2b44226657..620206898cf97 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -699,12 +699,20 @@ fn gen_table_plan_inner( vec![], ); - if append_only && row_id_index.is_none() { - return Err(ErrorCode::InvalidInputSyntax( - "PRIMARY KEY constraint can not be applied to an append-only table.".to_owned(), - ) - .into()); - } + let pk_on_append_only = append_only && row_id_index.is_none(); + + let on_conflict = if pk_on_append_only { + let on_conflict = on_conflict.unwrap_or(OnConflict::Ignore); + if on_conflict != OnConflict::Ignore { + return Err(ErrorCode::InvalidInputSyntax( + "When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be IGNORE.".to_owned(), + ) + .into()); + } + Some(on_conflict) + } else { + on_conflict + }; if !append_only && !watermark_descs.is_empty() { return Err(ErrorCode::NotSupported( diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de5c3deaf0d6b..83ae27a501a09 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -675,8 +675,8 @@ impl PlanRoot { #[derive(PartialEq, Debug, Copy, Clone)] enum PrimaryKeyKind { UserDefinedPrimaryKey, - RowIdAsPrimaryKey, - AppendOnly, + NonAppendOnlyRowIdPk, + AppendOnlyRowIdPk, } fn inject_dml_node( @@ -693,25 +693,28 @@ impl PlanRoot { dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?; dml_node = match kind { - PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => { + PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => { RequiredDist::hash_shard(pk_column_indices) .enforce_if_not_satisfies(dml_node, &Order::any())? } - PrimaryKeyKind::AppendOnly => StreamExchange::new_no_shuffle(dml_node).into(), + PrimaryKeyKind::AppendOnlyRowIdPk => { + StreamExchange::new_no_shuffle(dml_node).into() + } }; Ok(dml_node) } - let kind = if append_only { - assert!(row_id_index.is_some()); - PrimaryKeyKind::AppendOnly - } else if let Some(row_id_index) = row_id_index { + let kind = if let Some(row_id_index) = row_id_index { assert_eq!( pk_column_indices.iter().exactly_one().copied().unwrap(), row_id_index ); - PrimaryKeyKind::RowIdAsPrimaryKey + if append_only { + PrimaryKeyKind::AppendOnlyRowIdPk + } else { + PrimaryKeyKind::NonAppendOnlyRowIdPk + } } else { PrimaryKeyKind::UserDefinedPrimaryKey }; @@ -738,7 +741,7 @@ impl PlanRoot { .enforce_if_not_satisfies(external_source_node, &Order::any())? } - PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { + PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => { StreamExchange::new_no_shuffle(external_source_node).into() } }; @@ -814,7 +817,7 @@ impl PlanRoot { PrimaryKeyKind::UserDefinedPrimaryKey => { unreachable!() } - PrimaryKeyKind::RowIdAsPrimaryKey | PrimaryKeyKind::AppendOnly => { + PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => { stream_plan = StreamRowIdGen::new_with_dist( stream_plan, row_id_index,