diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 84fff651b547c..c4b4713af81cc 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -33,6 +33,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- Install dependencies" python3 -m pip install --break-system-packages requests protobuf fastavro confluent_kafka jsonschema +apt-get -y install jq echo "--- e2e, inline test" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ @@ -134,21 +135,7 @@ risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' echo "--- Kill cluster" risedev ci-kill - -echo "--- e2e, ci-1cn-1fe, protobuf schema registry" export RISINGWAVE_CI=true -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-1cn-1fe -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 20 user -echo "make sure google/protobuf/source_context.proto is NOT in schema registry" -curl --silent 'http://schemaregistry:8082/subjects'; echo -# curl --silent --head -X GET 'http://schemaregistry:8082/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 -curl --silent 'http://schemaregistry:8082/subjects' | grep -v 'google/protobuf/source_context.proto' -risedev slt './e2e_test/schema_registry/pb.slt' -risedev slt './e2e_test/schema_registry/alter_sr.slt' - -echo "--- Kill cluster" -risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ diff --git a/e2e_test/schema_registry/alter_sr.slt b/e2e_test/schema_registry/alter_sr.slt deleted file mode 100644 index d703c0401a35e..0000000000000 --- a/e2e_test/schema_registry/alter_sr.slt +++ /dev/null @@ -1,80 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -statement ok -CREATE SOURCE src_user WITH ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) -FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' -); - -statement ok -CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; - -statement ok -CREATE TABLE t_user WITH ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest' -) -FORMAT PLAIN ENCODE PROTOBUF( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' -); - -statement error -SELECT age FROM mv_user; - -statement error -SELECT age FROM t_user; - -# Push more events with extended fields -system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields - -sleep 5s - -# Refresh source schema -statement ok -ALTER SOURCE src_user REFRESH SCHEMA; - -statement ok -CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; - -# Refresh table schema -statement ok -ALTER TABLE t_user REFRESH SCHEMA; - -query IIII -SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; ----- -25 4 0 10 - -# Push more events with extended fields -system ok -python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://schemaregistry:8082" "sr_pb_test" 5 user_with_more_fields - -sleep 5s - -query IIII -SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; ----- -30 4 0 10 - -statement ok -DROP MATERIALIZED VIEW mv_user_more; - -statement ok -DROP TABLE t_user; - -statement ok -DROP MATERIALIZED VIEW mv_user; - -statement ok -DROP SOURCE src_user; diff --git a/e2e_test/schema_registry/pb.slt b/e2e_test/schema_registry/pb.slt deleted file mode 100644 index 7b60b4fa8d7a4..0000000000000 --- a/e2e_test/schema_registry/pb.slt +++ /dev/null @@ -1,50 +0,0 @@ -# Before running this test, seed data into kafka: -# python3 e2e_test/schema_registry/pb.py - -# Create a table. -statement ok -create table sr_pb_test with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082', - message = 'test.User' - ); - -# for multiple schema registry nodes -statement ok -create table sr_pb_test_bk with ( - connector = 'kafka', - topic = 'sr_pb_test', - properties.bootstrap.server = 'message_queue:29092', - scan.startup.mode = 'earliest') -FORMAT plain ENCODE protobuf( - schema.registry = 'http://schemaregistry:8082,http://schemaregistry:8082', - message = 'test.User' - ); - -# Wait for source -sleep 10s - -# Flush into storage -statement ok -flush; - -query I -select count(*) from sr_pb_test; ----- -20 - -query IIT -select min(id), max(id), max((sc).file_name) from sr_pb_test; ----- -0 19 source/context_019.proto - - -statement ok -drop table sr_pb_test; - -statement ok -drop table sr_pb_test_bk; diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index 48342bceafd42..57d09d8237efa 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -37,6 +37,12 @@ set -e if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "Deleting all Kafka topics..." rpk topic delete -r "*" + echo "Deleting all schema registry subjects" + rpk sr subject list | while read -r subject; do + echo "Deleting schema registry subject: $subject" + rpk sr subject delete "$subject" + rpk sr subject delete "$subject" --permanent + done else echo "No Kafka to clean." fi diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt new file mode 100644 index 0000000000000..e60bf5c0295b0 --- /dev/null +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -0,0 +1,70 @@ +control substitution on + +# https://github.com/risingwavelabs/risingwave/issues/16486 + +# cleanup +system ok +rpk topic delete 'avro_alter_source_test' || true; \\ +(rpk sr subject delete 'avro_alter_source_test-value' && rpk sr subject delete 'avro_alter_source_test-value' --permanent) || true; + +# create topic and sr subject +system ok +rpk topic create 'avro_alter_source_test' + +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +statement ok +create source s +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'avro_alter_source_test' +) +FORMAT PLAIN ENCODE AVRO ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}' +); + +# create a new version of schema and produce a message +system ok +echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \\ +| curl -X POST -H 'content-type:application/json' -d @- '${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_source_test-value/versions' + +system ok +echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test + +query ? +select * from s +---- +ABC + +statement error +alter source s format plain encode json; +---- +db error: ERROR: Failed to run the query + +Caused by: + Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Avro, and altering them is not supported yet +No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + + +statement ok +alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); + +query ?? +select * from s +---- +ABC 1 + +statement ok +create materialized view mv as select * from s; + +sleep 2s + +query ?? +select * from mv +---- +ABC 1 + +statement ok +drop source s cascade; diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt new file mode 100644 index 0000000000000..c9db2df3ca4ee --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -0,0 +1,91 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +statement ok +CREATE SOURCE src_user +INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +statement ok +CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user; + +statement ok +CREATE TABLE t_user WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest' +) +FORMAT PLAIN ENCODE PROTOBUF( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' +); + +statement error +SELECT age FROM mv_user; + +statement error +SELECT age FROM t_user; + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields + +sleep 5s + +# Refresh source schema +statement ok +ALTER SOURCE src_user REFRESH SCHEMA; + +statement ok +CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user; + +# Refresh table schema. It consume new data before refresh, so the new fields are NULLs +statement ok +ALTER TABLE t_user REFRESH SCHEMA; + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more; +---- +25 104 0 510 + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; +---- +25 NULL NULL NULL + +# Push more events with extended fields +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 5 user_with_more_fields + +sleep 5s + +query ???? +SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user; +---- +30 104 100 510 + +statement ok +DROP MATERIALIZED VIEW mv_user_more; + +statement ok +DROP TABLE t_user; + +statement ok +DROP MATERIALIZED VIEW mv_user; + +statement ok +DROP SOURCE src_user; diff --git a/e2e_test/source_inline/kafka/protobuf/basic.slt b/e2e_test/source_inline/kafka/protobuf/basic.slt new file mode 100644 index 0000000000000..82eb61560aa4d --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/basic.slt @@ -0,0 +1,58 @@ +control substitution on + +system ok +rpk topic delete sr_pb_test || true; \\ +(rpk sr subject delete 'sr_pb_test-value' && rpk sr subject delete 'sr_pb_test-value' --permanent) || true; + +system ok +python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "sr_pb_test" 20 user + +# make sure google/protobuf/source_context.proto is NOT in schema registry +system ok +curl --silent '${RISEDEV_SCHEMA_REGISTRY_URL}' | grep -v 'google/protobuf/source_context.proto' + +# Create a table. +statement ok +create table sr_pb_test with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# for multiple schema registry nodes +statement ok +create table sr_pb_test_bk with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL},${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + +# Wait for source +sleep 2s + +# Flush into storage +statement ok +flush; + +query I +select count(*) from sr_pb_test; +---- +20 + +query IT +select min(id), max(id), max((sc).file_name) from sr_pb_test; +---- +0 19 source/context_019.proto + + +statement ok +drop table sr_pb_test; + +statement ok +drop table sr_pb_test_bk; diff --git a/e2e_test/schema_registry/pb.py b/e2e_test/source_inline/kafka/protobuf/pb.py similarity index 73% rename from e2e_test/schema_registry/pb.py rename to e2e_test/source_inline/kafka/protobuf/pb.py index fd6e0dc478b51..4cab50f899e50 100644 --- a/e2e_test/schema_registry/pb.py +++ b/e2e_test/source_inline/kafka/protobuf/pb.py @@ -25,6 +25,7 @@ def get_user(i): sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), ) + def get_user_with_more_fields(i): return user_pb2.User( id=i, @@ -33,15 +34,18 @@ def get_user_with_more_fields(i): city="City_{}".format(i), gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE, sc=SourceContext(file_name="source/context_{:03}.proto".format(i)), - age=i, + age=100 + i, ) -def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message): + +def send_to_kafka( + producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message +): schema_registry_client = SchemaRegistryClient(schema_registry_conf) serializer = ProtobufSerializer( pb_message, schema_registry_client, - {"use.deprecated.format": False, 'skip.known.types': True}, + {"use.deprecated.format": False, "skip.known.types": True}, ) producer = Producer(producer_conf) @@ -60,7 +64,9 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u if __name__ == "__main__": if len(sys.argv) < 6: - print("pb.py ") + print( + "pb.py " + ) exit(1) broker_list = sys.argv[1] @@ -69,20 +75,29 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u num_records = int(sys.argv[4]) pb_message = sys.argv[5] - user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2') + user_pb2 = importlib.import_module(f"{pb_message}_pb2") all_pb_messages = { - 'user': get_user, - 'user_with_more_fields': get_user_with_more_fields, + "user": get_user, + "user_with_more_fields": get_user_with_more_fields, } - assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}' + assert ( + pb_message in all_pb_messages + ), f"pb_message must be one of {list(all_pb_messages.keys())}" schema_registry_conf = {"url": schema_registry_url} producer_conf = {"bootstrap.servers": broker_list} try: - send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User) + send_to_kafka( + producer_conf, + schema_registry_conf, + topic, + num_records, + all_pb_messages[pb_message], + user_pb2.User, + ) except Exception as e: print("Send Protobuf data to schema registry and kafka failed {}", e) exit(1) diff --git a/e2e_test/schema_registry/protobuf/user.proto b/e2e_test/source_inline/kafka/protobuf/user.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user.proto rename to e2e_test/source_inline/kafka/protobuf/user.proto diff --git a/e2e_test/schema_registry/protobuf/user_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_pb2.py diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields.proto b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields.proto rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields.proto diff --git a/e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py b/e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py similarity index 100% rename from e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py rename to e2e_test/source_inline/kafka/protobuf/user_with_more_fields_pb2.py diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 82d2f22f41cb4..e815a68be3c67 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -18,10 +18,10 @@ use itertools::Itertools; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ - AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + additional_column, AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; -use super::row_id_column_desc; +use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID}; use crate::types::DataType; @@ -45,6 +45,10 @@ impl ColumnId { pub const fn placeholder() -> Self { Self(i32::MAX - 1) } + + pub const fn first_user_column() -> Self { + Self(USER_COLUMN_ID_OFFSET) + } } impl ColumnId { @@ -103,23 +107,16 @@ pub struct ColumnDesc { pub type_name: String, pub generated_or_default_column: Option, pub description: Option, + /// Note: perhaps `additional_column` and `generated_or_default_column` should be represented in one `enum`, + /// but we used a separated type and field for convenience. pub additional_column: AdditionalColumn, pub version: ColumnDescVersion, + _private_field_to_prevent_direct_construction: (), } impl ColumnDesc { pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc { - ColumnDesc { - data_type, - column_id, - name: String::new(), - field_descs: vec![], - type_name: String::new(), - generated_or_default_column: None, - description: None, - additional_column: AdditionalColumn { column_type: None }, - version: ColumnDescVersion::Pr13707, - } + Self::named("", column_id, data_type) } pub fn named(name: impl Into, column_id: ColumnId, data_type: DataType) -> ColumnDesc { @@ -133,6 +130,7 @@ impl ColumnDesc { description: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -140,7 +138,7 @@ impl ColumnDesc { name: impl Into, column_id: ColumnId, data_type: DataType, - additional_column_type: AdditionalColumn, + additional_column_type: additional_column::ColumnType, ) -> ColumnDesc { ColumnDesc { data_type, @@ -150,8 +148,11 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_column: additional_column_type, + additional_column: AdditionalColumn { + column_type: Some(additional_column_type), + }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -190,18 +191,11 @@ impl ColumnDesc { descs } + /// TODO: Perhaps we should only use `new_atomic`, instead of `named`. pub fn new_atomic(data_type: DataType, name: &str, column_id: i32) -> Self { - Self { - data_type, - column_id: ColumnId::new(column_id), - name: name.to_string(), - field_descs: vec![], - type_name: "".to_string(), - generated_or_default_column: None, - description: None, - additional_column: AdditionalColumn { column_type: None }, - version: ColumnDescVersion::Pr13707, - } + // Perhapts we should call it non_struct instead of atomic, because of List... + debug_assert!(!matches!(data_type, DataType::Struct(_))); + Self::named(name, ColumnId::new(column_id), data_type) } pub fn new_struct( @@ -224,6 +218,7 @@ impl ColumnDesc { description: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -242,6 +237,7 @@ impl ColumnDesc { generated_or_default_column: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -320,7 +316,7 @@ pub struct ColumnCatalog { } impl ColumnCatalog { - /// Get the column catalog's is hidden. + /// If the column is a hidden column pub fn is_hidden(&self) -> bool { self.is_hidden } @@ -330,7 +326,7 @@ impl ColumnCatalog { self.column_desc.is_generated() } - /// If the column is a generated column + /// If the column is a generated column, returns the corresponding expr. pub fn generated_expr(&self) -> Option<&ExprNode> { if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) = &self.column_desc.generated_or_default_column @@ -346,6 +342,11 @@ impl ColumnCatalog { self.column_desc.is_default() } + /// If the columns is an `INCLUDE ... AS ...` connector column. + pub fn is_connector_additional_column(&self) -> bool { + self.column_desc.additional_column.column_type.is_some() + } + /// Get a reference to the column desc's data type. pub fn data_type(&self) -> &DataType { &self.column_desc.data_type @@ -411,34 +412,16 @@ impl ColumnCatalog { } } -pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec) { - debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0); - let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id(); - columns.iter().for_each(|column| { - let column_id = column.column_id().get_id(); - if column_id > max_incoming_column_id { - max_incoming_column_id = column_id; - } - }); - preserved_columns.iter_mut().for_each(|column| { - column - .column_desc - .column_id - .apply_delta_if_not_row_id(max_incoming_column_id) - }); - - preserved_columns.extend(columns); -} - -pub fn is_column_ids_dedup(columns: &[ColumnCatalog]) -> bool { - let mut column_ids = columns - .iter() - .map(|column| column.column_id().get_id()) - .collect_vec(); - column_ids.sort(); - let original_len = column_ids.len(); - column_ids.dedup(); - column_ids.len() == original_len +pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) { + debug_assert!( + columns + .iter() + .map(|c| c.column_id()) + .duplicates() + .next() + .is_none(), + "duplicate ColumnId found in source catalog. Columns: {columns:#?}" + ); } #[cfg(test)] diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 86c6e8895c066..c85ae737a4d71 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -127,9 +127,6 @@ pub const OFFSET_COLUMN_NAME: &str = "_rw_offset"; pub const CDC_SOURCE_COLUMN_NUM: u32 = 3; pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name"; -pub fn is_offset_column_name(name: &str) -> bool { - name.starts_with(OFFSET_COLUMN_NAME) -} /// Creates a offset column for storing upstream offset /// Used in cdc source currently pub fn offset_column_desc() -> ColumnDesc { diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 253718a00a7df..6f6756003ddb2 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -160,9 +160,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Bytea, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), - }, + AdditionalColumnType::Key(AdditionalColumnKey {}), ), is_hidden: false, }, @@ -171,11 +169,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Timestamptz, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Timestamp( - AdditionalColumnTimestamp {}, - )), - }, + AdditionalColumnType::Timestamp(AdditionalColumnTimestamp {}), ), is_hidden: false, }, @@ -184,11 +178,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Partition( - AdditionalColumnPartition {}, - )), - }, + AdditionalColumnType::Partition(AdditionalColumnPartition {}), ), is_hidden: false, }, @@ -197,9 +187,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})), - }, + AdditionalColumnType::Offset(AdditionalColumnOffset {}), ), is_hidden: false, }, @@ -208,9 +196,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})), - }, + AdditionalColumnType::Filename(AdditionalColumnFilename {}), ), is_hidden: false, }, @@ -220,11 +206,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::DatabaseName( - AdditionalDatabaseName {}, - )), - }, + AdditionalColumnType::DatabaseName(AdditionalDatabaseName {}), ), is_hidden: false, }, @@ -233,9 +215,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})), - }, + AdditionalColumnType::SchemaName(AdditionalSchemaName {}), ), is_hidden: false, }, @@ -245,9 +225,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), - }, + AdditionalColumnType::TableName(AdditionalTableName {}), ), is_hidden: false, }, @@ -256,11 +234,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::CollectionName( - AdditionalCollectionName {}, - )), - }, + AdditionalColumnType::CollectionName(AdditionalCollectionName {}), ), is_hidden: false, }, @@ -390,12 +364,10 @@ fn build_header_catalog( col_name, column_id, data_type, - AdditionalColumn { - column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader { - inner_field: inner.to_string(), - data_type: Some(pb_data_type), - })), - }, + AdditionalColumnType::HeaderInner(AdditionalColumnHeader { + inner_field: inner.to_string(), + data_type: Some(pb_data_type), + }), ), is_hidden: false, } @@ -405,9 +377,7 @@ fn build_header_catalog( col_name, column_id, DataType::List(get_kafka_header_item_datatype().into()), - AdditionalColumn { - column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})), - }, + AdditionalColumnType::Headers(AdditionalColumnHeaders {}), ), is_hidden: false, } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..95c25a00a669f 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -284,11 +284,7 @@ mod tests { "commit_ts", ColumnId::new(6), DataType::Timestamptz, - AdditionalColumn { - column_type: Some(additional_column::ColumnType::Timestamp( - AdditionalColumnTimestamp {}, - )), - }, + additional_column::ColumnType::Timestamp(AdditionalColumnTimestamp {}), ), ]; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index f7667a66a3747..26cf746b535dc 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1028,6 +1028,7 @@ pub mod test_utils { } } +/// Note: this is created in `SourceReader::build_stream` #[derive(Debug, Clone, Default)] pub struct ParserConfig { pub common: CommonParserConfig, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index c5283a2cc592a..94b2980fcb63d 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{is_system_schema, Field}; +use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; @@ -221,6 +221,7 @@ impl Binder { source_catalog: &SourceCatalog, as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { + debug_assert_column_ids_distinct(&source_catalog.columns); self.included_relations.insert(source_catalog.id.into()); ( Relation::Source(Box::new(BoundSource { diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index bfe4537fa7085..d7c2fd3778c79 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -210,7 +210,7 @@ impl TableType { } } -/// The version of a table, used by schema change. See [`PbTableVersion`]. +/// The version of a table, used by schema change. See [`PbTableVersion`] for more details. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct TableVersion { pub version_id: TableVersionId, diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index fcabedc1149c4..db54cce612a91 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnCatalog, ColumnId}; use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct}; use risingwave_sqlparser::ast::{ AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement, @@ -106,10 +106,7 @@ pub async fn handle_alter_source_column( catalog.definition = alter_definition_add_column(&catalog.definition, column_def.clone())?; let mut bound_column = bind_sql_columns(&[column_def])?.remove(0); - bound_column.column_desc.column_id = columns - .iter() - .fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id())) - .next(); + bound_column.column_desc.column_id = max_column_id(columns).next(); columns.push(bound_column); } _ => unreachable!(), @@ -147,6 +144,20 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul Ok(stmt.to_string()) } +/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`, +/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`. +/// (But for now this isn't a large problem, since drop column is not allowed for source yet..) +/// +/// Besides, the logic of column id handling is a mess. +/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end; +/// In other places, we create column id ad-hoc. +pub fn max_column_id(columns: &Vec) -> ColumnId { + // XXX: should we check the column IDs of struct fields here? + columns + .iter() + .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id())) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index c72cf547365d7..af87960547bc0 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::{ }; use risingwave_sqlparser::parser::Parser; +use super::alter_source_column::max_column_id; use super::alter_table_column::schema_has_schema_registry; use super::create_source::{bind_columns_from_source, validate_compatibility}; use super::util::SourceSchemaCompatExt; @@ -68,14 +69,19 @@ fn encode_type_to_encode(from: EncodeType) -> Option { }) } -/// Returns the columns in `columns_a` but not in `columns_b`, -/// where the comparison is done by name and data type, -/// and hidden columns are ignored. +/// Returns the columns in `columns_a` but not in `columns_b`. +/// +/// Note: +/// - The comparison is done by name and data type, without checking `ColumnId`. +/// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr. +/// For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns. +/// This is fragile and we should really refactor it later. fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { columns_a .iter() .filter(|col_a| { !col_a.is_hidden() + && !col_a.is_connector_additional_column() && !columns_b.iter().any(|col_b| { col_a.name() == col_b.name() && col_a.data_type() == col_b.data_type() }) @@ -162,8 +168,20 @@ pub async fn refresh_sr_and_get_columns_diff( unreachable!("source without schema registry is rejected") }; - let added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + let mut added_columns = columns_minus(&columns_from_resolve_source, &original_source.columns); + // The newly resolved columns' column IDs also starts from 1. They cannot be used directly. + let mut next_col_id = max_column_id(&original_source.columns).next(); + for col in &mut added_columns { + col.column_desc.column_id = next_col_id; + next_col_id = next_col_id.next(); + } let dropped_columns = columns_minus(&original_source.columns, &columns_from_resolve_source); + tracing::debug!( + ?added_columns, + ?dropped_columns, + ?columns_from_resolve_source, + original_source = ?original_source.columns + ); Ok((source_info, added_columns, dropped_columns)) } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a29aa86907e0f..1f458bbbc09b5 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ - is_column_ids_dedup, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, + debug_assert_column_ids_distinct, ColumnCatalog, ColumnDesc, ColumnId, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -73,8 +73,8 @@ use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported use crate::error::{Result, RwError}; use crate::expr::Expr; use crate::handler::create_table::{ - bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, - ensure_table_constraints_supported, ColumnIdGenerator, + bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns, + bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -289,8 +289,11 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result for more information. -/// return `(columns, source info)` +/// Resolves the schema of the source from external schema file. +/// See for more information. +/// +/// Note: the returned schema strictly corresponds to the schema. +/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included. pub(crate) async fn bind_columns_from_source( session: &SessionImpl, source_schema: &ConnectorSchema, @@ -489,6 +492,28 @@ pub(crate) async fn bind_columns_from_source( } }; + if cfg!(debug_assertions) { + // validate column ids + // Note: this just documents how it works currently. It doesn't mean whether it's reasonable. + if let Some(ref columns) = columns { + let mut i = 1; + fn check_col(col: &ColumnDesc, i: &mut usize, columns: &Vec) { + for nested_col in &col.field_descs { + // What's the usage of struct fields' column IDs? + check_col(nested_col, i, columns); + } + assert!( + col.column_id.get_id() == *i as i32, + "unexpected column id\ncol: {col:?}\ni: {i}\ncolumns: {columns:#?}" + ); + *i += 1; + } + for col in columns { + check_col(&col.column_desc, &mut i, columns); + } + } + } + if !format_encode_options_to_consume.is_empty() { let err_string = format!( "Get unknown format_encode_options for {:?} {:?}: {}", @@ -1387,10 +1412,12 @@ pub async fn bind_create_source( .into()); } + // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary. + // XXX: should we also chenge the col id for struct fields? for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } - debug_assert!(is_column_ids_dedup(&columns)); + debug_assert_column_ids_distinct(&columns); let must_need_pk = if is_create_source { with_properties.connector_need_pk() @@ -1403,7 +1430,7 @@ pub async fn bind_create_source( }; let (mut columns, pk_col_ids, row_id_index) = - bind_pk_on_relation(columns, pk_names, must_need_pk)?; + bind_pk_and_row_id_on_relation(columns, pk_names, must_need_pk)?; let watermark_descs = bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0f3693653ced5..c542762702053 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, - INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, + INITIAL_TABLE_VERSION_ID, }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -110,7 +110,7 @@ impl ColumnIdGenerator { pub fn new_initial() -> Self { Self { existing: HashMap::new(), - next_column_id: ColumnId::from(USER_COLUMN_ID_OFFSET), + next_column_id: ColumnId::first_user_column(), version_id: INITIAL_TABLE_VERSION_ID, } } @@ -404,7 +404,7 @@ fn multiple_pk_definition_err() -> RwError { /// /// It returns the columns together with `pk_column_ids`, and an optional row id column index if /// added. -pub fn bind_pk_on_relation( +pub fn bind_pk_and_row_id_on_relation( mut columns: Vec, pk_names: Vec, must_need_pk: bool, @@ -570,7 +570,8 @@ pub(crate) fn gen_create_table_plan_without_source( ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (mut columns, pk_column_ids, row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let watermark_descs: Vec = bind_source_watermark( context.session_ctx(), @@ -762,7 +763,8 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (columns, pk_column_ids, _row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; + let (columns, pk_column_ids, _row_id_index) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -881,7 +883,6 @@ fn derive_connect_properties( pub(super) async fn handle_create_table_plan( handler_args: HandlerArgs, explain_options: ExplainOptions, - col_id_gen: ColumnIdGenerator, source_schema: Option, cdc_table_info: Option, table_name: ObjectName, @@ -894,6 +895,7 @@ pub(super) async fn handle_create_table_plan( with_version_column: Option, include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { + let col_id_gen = ColumnIdGenerator::new_initial(); let source_schema = check_create_table_with_source( &handler_args.with_options, source_schema, @@ -1148,11 +1150,9 @@ pub async fn handle_create_table( } let (graph, source, table, job_type) = { - let col_id_gen = ColumnIdGenerator::new_initial(); let (plan, source, table, job_type) = handle_create_table_plan( handler_args, ExplainOptions::default(), - col_id_gen, source_schema, cdc_table_info, table_name.clone(), @@ -1435,7 +1435,8 @@ mod tests { } ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; + let (_, pk_column_ids, _) = + bind_pk_and_row_id_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 9f46087c206e8..db124b373181b 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -22,7 +22,6 @@ use thiserror_ext::AsReport; use super::create_index::{gen_create_index_plan, resolve_index_schema}; use super::create_mv::gen_create_mv_plan; use super::create_sink::{gen_sink_plan, get_partition_compute_info}; -use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; use super::{RwPgResponse, RwPgResponseBuilderExt}; @@ -66,14 +65,11 @@ async fn do_handle_explain( wildcard_idx, .. } => { - let col_id_gen = ColumnIdGenerator::new_initial(); - let source_schema = source_schema.map(|s| s.into_v2_with_warning()); let (plan, _source, _table, _job_type) = handle_create_table_plan( handler_args, explain_options, - col_id_gen, source_schema, cdc_table_info, name.clone(), diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7fd4f0b92822b..73b52b977c7a4 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -151,6 +151,15 @@ fn to_pg_rows( session_data: &StaticSessionData, ) -> RwResult> { assert_eq!(chunk.dimension(), column_types.len()); + if cfg!(debug_assertions) { + let chunk_data_types = chunk.data_types(); + for (ty1, ty2) in chunk_data_types.iter().zip_eq_fast(column_types) { + debug_assert!( + ty1.equals_datatype(ty2), + "chunk_data_types: {chunk_data_types:?}, column_types: {column_types:?}" + ) + } + } chunk .rows() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 0310fdbbd439b..918db2919e626 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -70,6 +70,12 @@ impl LogicalSource { ctx: OptimizerContextRef, as_of: Option, ) -> Result { + // XXX: should we reorder the columns? + // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32] + // related: https://github.com/risingwavelabs/risingwave/issues/16486 + // The order does not matter much. The columns field is essentially a map indexed by the column id. + // It will affect what users will see in `SELECT *`. + // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so... let core = generic::Source { catalog: source_catalog, column_catalog, diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 943f37abf655d..bff4062f72097 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -78,13 +78,9 @@ pub fn generate_risedev_env(services: &Vec) -> String { writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); } ServiceConfig::SchemaRegistry(c) => { - let address = &c.address; - let port = &c.port; - writeln!( - env, - r#"RISEDEV_SCHEMA_REGISTRY_URL="http://{address}:{port}""#, - ) - .unwrap(); + let url = format!("http://{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_SCHEMA_REGISTRY_URL="{url}""#,).unwrap(); + writeln!(env, r#"RPK_REGISTRY_HOSTS="{url}""#).unwrap(); } ServiceConfig::MySql(c) if c.application != Application::Metastore => { let host = &c.address;