From 1707ad0e8b144173953a483aa29f7104f80885a0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 19 Jun 2024 17:01:23 +0800 Subject: [PATCH] refactor: refactor ColumnDesc --- src/common/src/catalog/column.rs | 65 ++++++------------- src/common/src/catalog/mod.rs | 3 - .../src/parser/additional_columns.rs | 58 ++++------------- .../src/parser/debezium/debezium_parser.rs | 6 +- 4 files changed, 34 insertions(+), 98 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index dde746bf3200a..e815a68be3c67 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ - AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + additional_column, AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET}; @@ -107,23 +107,16 @@ pub struct ColumnDesc { pub type_name: String, pub generated_or_default_column: Option, pub description: Option, + /// Note: perhaps `additional_column` and `generated_or_default_column` should be represented in one `enum`, + /// but we used a separated type and field for convenience. pub additional_column: AdditionalColumn, pub version: ColumnDescVersion, + _private_field_to_prevent_direct_construction: (), } impl ColumnDesc { pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc { - ColumnDesc { - data_type, - column_id, - name: String::new(), - field_descs: vec![], - type_name: String::new(), - generated_or_default_column: None, - description: None, - additional_column: AdditionalColumn { column_type: None }, - version: ColumnDescVersion::Pr13707, - } + Self::named("", column_id, data_type) } pub fn named(name: impl Into, column_id: ColumnId, data_type: DataType) -> ColumnDesc { @@ -137,6 +130,7 @@ impl ColumnDesc { description: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -144,7 +138,7 @@ impl ColumnDesc { name: impl Into, column_id: ColumnId, data_type: DataType, - additional_column_type: AdditionalColumn, + additional_column_type: additional_column::ColumnType, ) -> ColumnDesc { ColumnDesc { data_type, @@ -154,8 +148,11 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_column: additional_column_type, + additional_column: AdditionalColumn { + column_type: Some(additional_column_type), + }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -194,18 +191,11 @@ impl ColumnDesc { descs } + /// TODO: Perhaps we should only use `new_atomic`, instead of `named`. pub fn new_atomic(data_type: DataType, name: &str, column_id: i32) -> Self { - Self { - data_type, - column_id: ColumnId::new(column_id), - name: name.to_string(), - field_descs: vec![], - type_name: "".to_string(), - generated_or_default_column: None, - description: None, - additional_column: AdditionalColumn { column_type: None }, - version: ColumnDescVersion::Pr13707, - } + // Perhapts we should call it non_struct instead of atomic, because of List... + debug_assert!(!matches!(data_type, DataType::Struct(_))); + Self::named(name, ColumnId::new(column_id), data_type) } pub fn new_struct( @@ -228,6 +218,7 @@ impl ColumnDesc { description: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -246,6 +237,7 @@ impl ColumnDesc { generated_or_default_column: None, additional_column: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, + _private_field_to_prevent_direct_construction: (), } } @@ -324,7 +316,7 @@ pub struct ColumnCatalog { } impl ColumnCatalog { - /// Get the column catalog's is hidden. + /// If the column is a hidden column pub fn is_hidden(&self) -> bool { self.is_hidden } @@ -334,7 +326,7 @@ impl ColumnCatalog { self.column_desc.is_generated() } - /// If the column is a generated column + /// If the column is a generated column, returns the corresponding expr. pub fn generated_expr(&self) -> Option<&ExprNode> { if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) = &self.column_desc.generated_or_default_column @@ -420,25 +412,6 @@ impl ColumnCatalog { } } -pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec) { - debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0); - let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id(); - columns.iter().for_each(|column| { - let column_id = column.column_id().get_id(); - if column_id > max_incoming_column_id { - max_incoming_column_id = column_id; - } - }); - preserved_columns.iter_mut().for_each(|column| { - column - .column_desc - .column_id - .apply_delta_if_not_row_id(max_incoming_column_id) - }); - - preserved_columns.extend(columns); -} - pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) { debug_assert!( columns diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 86c6e8895c066..c85ae737a4d71 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -127,9 +127,6 @@ pub const OFFSET_COLUMN_NAME: &str = "_rw_offset"; pub const CDC_SOURCE_COLUMN_NUM: u32 = 3; pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name"; -pub fn is_offset_column_name(name: &str) -> bool { - name.starts_with(OFFSET_COLUMN_NAME) -} /// Creates a offset column for storing upstream offset /// Used in cdc source currently pub fn offset_column_desc() -> ColumnDesc { diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 253718a00a7df..6f6756003ddb2 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -160,9 +160,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Bytea, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), - }, + AdditionalColumnType::Key(AdditionalColumnKey {}), ), is_hidden: false, }, @@ -171,11 +169,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Timestamptz, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Timestamp( - AdditionalColumnTimestamp {}, - )), - }, + AdditionalColumnType::Timestamp(AdditionalColumnTimestamp {}), ), is_hidden: false, }, @@ -184,11 +178,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Partition( - AdditionalColumnPartition {}, - )), - }, + AdditionalColumnType::Partition(AdditionalColumnPartition {}), ), is_hidden: false, }, @@ -197,9 +187,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})), - }, + AdditionalColumnType::Offset(AdditionalColumnOffset {}), ), is_hidden: false, }, @@ -208,9 +196,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})), - }, + AdditionalColumnType::Filename(AdditionalColumnFilename {}), ), is_hidden: false, }, @@ -220,11 +206,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::DatabaseName( - AdditionalDatabaseName {}, - )), - }, + AdditionalColumnType::DatabaseName(AdditionalDatabaseName {}), ), is_hidden: false, }, @@ -233,9 +215,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})), - }, + AdditionalColumnType::SchemaName(AdditionalSchemaName {}), ), is_hidden: false, }, @@ -245,9 +225,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), - }, + AdditionalColumnType::TableName(AdditionalTableName {}), ), is_hidden: false, }, @@ -256,11 +234,7 @@ pub fn build_additional_column_catalog( column_name, column_id, DataType::Varchar, - AdditionalColumn { - column_type: Some(AdditionalColumnType::CollectionName( - AdditionalCollectionName {}, - )), - }, + AdditionalColumnType::CollectionName(AdditionalCollectionName {}), ), is_hidden: false, }, @@ -390,12 +364,10 @@ fn build_header_catalog( col_name, column_id, data_type, - AdditionalColumn { - column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader { - inner_field: inner.to_string(), - data_type: Some(pb_data_type), - })), - }, + AdditionalColumnType::HeaderInner(AdditionalColumnHeader { + inner_field: inner.to_string(), + data_type: Some(pb_data_type), + }), ), is_hidden: false, } @@ -405,9 +377,7 @@ fn build_header_catalog( col_name, column_id, DataType::List(get_kafka_header_item_datatype().into()), - AdditionalColumn { - column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})), - }, + AdditionalColumnType::Headers(AdditionalColumnHeaders {}), ), is_hidden: false, } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 817c2a788f2be..95c25a00a669f 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -284,11 +284,7 @@ mod tests { "commit_ts", ColumnId::new(6), DataType::Timestamptz, - AdditionalColumn { - column_type: Some(additional_column::ColumnType::Timestamp( - AdditionalColumnTimestamp {}, - )), - }, + additional_column::ColumnType::Timestamp(AdditionalColumnTimestamp {}), ), ];