Skip to content

Commit

Permalink
refactor: refactor ColumnDesc
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 19, 2024
1 parent ef00148 commit 1707ad0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 98 deletions.
65 changes: 19 additions & 46 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use itertools::Itertools;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
additional_column, AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
};

use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
Expand Down Expand Up @@ -107,23 +107,16 @@ pub struct ColumnDesc {
pub type_name: String,
pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
pub description: Option<String>,
/// Note: perhaps `additional_column` and `generated_or_default_column` should be represented in one `enum`,
/// but we used a separated type and field for convenience.
pub additional_column: AdditionalColumn,
pub version: ColumnDescVersion,
_private_field_to_prevent_direct_construction: (),
}

impl ColumnDesc {
pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
ColumnDesc {
data_type,
column_id,
name: String::new(),
field_descs: vec![],
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
Self::named("", column_id, data_type)
}

pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
Expand All @@ -137,14 +130,15 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
_private_field_to_prevent_direct_construction: (),
}
}

pub fn named_with_additional_column(
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
additional_column_type: AdditionalColumn,
additional_column_type: additional_column::ColumnType,
) -> ColumnDesc {
ColumnDesc {
data_type,
Expand All @@ -154,8 +148,11 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column: additional_column_type,
additional_column: AdditionalColumn {
column_type: Some(additional_column_type),
},
version: ColumnDescVersion::Pr13707,
_private_field_to_prevent_direct_construction: (),
}
}

Expand Down Expand Up @@ -194,18 +191,11 @@ impl ColumnDesc {
descs
}

/// TODO: Perhaps we should only use `new_atomic`, instead of `named`.
pub fn new_atomic(data_type: DataType, name: &str, column_id: i32) -> Self {
Self {
data_type,
column_id: ColumnId::new(column_id),
name: name.to_string(),
field_descs: vec![],
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
// Perhapts we should call it non_struct instead of atomic, because of List...
debug_assert!(!matches!(data_type, DataType::Struct(_)));
Self::named(name, ColumnId::new(column_id), data_type)
}

pub fn new_struct(
Expand All @@ -228,6 +218,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
_private_field_to_prevent_direct_construction: (),
}
}

Expand All @@ -246,6 +237,7 @@ impl ColumnDesc {
generated_or_default_column: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
_private_field_to_prevent_direct_construction: (),
}
}

Expand Down Expand Up @@ -324,7 +316,7 @@ pub struct ColumnCatalog {
}

impl ColumnCatalog {
/// Get the column catalog's is hidden.
/// If the column is a hidden column
pub fn is_hidden(&self) -> bool {
self.is_hidden
}
Expand All @@ -334,7 +326,7 @@ impl ColumnCatalog {
self.column_desc.is_generated()
}

/// If the column is a generated column
/// If the column is a generated column, returns the corresponding expr.
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
&self.column_desc.generated_or_default_column
Expand Down Expand Up @@ -420,25 +412,6 @@ impl ColumnCatalog {
}
}

pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
columns.iter().for_each(|column| {
let column_id = column.column_id().get_id();
if column_id > max_incoming_column_id {
max_incoming_column_id = column_id;
}
});
preserved_columns.iter_mut().for_each(|column| {
column
.column_desc
.column_id
.apply_delta_if_not_row_id(max_incoming_column_id)
});

preserved_columns.extend(columns);
}

pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
debug_assert!(
columns
Expand Down
3 changes: 0 additions & 3 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";
pub const CDC_SOURCE_COLUMN_NUM: u32 = 3;
pub const TABLE_NAME_COLUMN_NAME: &str = "_rw_table_name";

pub fn is_offset_column_name(name: &str) -> bool {
name.starts_with(OFFSET_COLUMN_NAME)
}
/// Creates a offset column for storing upstream offset
/// Used in cdc source currently
pub fn offset_column_desc() -> ColumnDesc {
Expand Down
58 changes: 14 additions & 44 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Bytea,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
},
AdditionalColumnType::Key(AdditionalColumnKey {}),
),
is_hidden: false,
},
Expand All @@ -171,11 +169,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Timestamptz,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
},
AdditionalColumnType::Timestamp(AdditionalColumnTimestamp {}),
),
is_hidden: false,
},
Expand All @@ -184,11 +178,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Partition(
AdditionalColumnPartition {},
)),
},
AdditionalColumnType::Partition(AdditionalColumnPartition {}),
),
is_hidden: false,
},
Expand All @@ -197,9 +187,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
},
AdditionalColumnType::Offset(AdditionalColumnOffset {}),
),
is_hidden: false,
},
Expand All @@ -208,9 +196,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
},
AdditionalColumnType::Filename(AdditionalColumnFilename {}),
),
is_hidden: false,
},
Expand All @@ -220,11 +206,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::DatabaseName(
AdditionalDatabaseName {},
)),
},
AdditionalColumnType::DatabaseName(AdditionalDatabaseName {}),
),
is_hidden: false,
},
Expand All @@ -233,9 +215,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
},
AdditionalColumnType::SchemaName(AdditionalSchemaName {}),
),
is_hidden: false,
},
Expand All @@ -245,9 +225,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
},
AdditionalColumnType::TableName(AdditionalTableName {}),
),
is_hidden: false,
},
Expand All @@ -256,11 +234,7 @@ pub fn build_additional_column_catalog(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::CollectionName(
AdditionalCollectionName {},
)),
},
AdditionalColumnType::CollectionName(AdditionalCollectionName {}),
),
is_hidden: false,
},
Expand Down Expand Up @@ -390,12 +364,10 @@ fn build_header_catalog(
col_name,
column_id,
data_type,
AdditionalColumn {
column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
inner_field: inner.to_string(),
data_type: Some(pb_data_type),
})),
},
AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
inner_field: inner.to_string(),
data_type: Some(pb_data_type),
}),
),
is_hidden: false,
}
Expand All @@ -405,9 +377,7 @@ fn build_header_catalog(
col_name,
column_id,
DataType::List(get_kafka_header_item_datatype().into()),
AdditionalColumn {
column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
},
AdditionalColumnType::Headers(AdditionalColumnHeaders {}),
),
is_hidden: false,
}
Expand Down
6 changes: 1 addition & 5 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,7 @@ mod tests {
"commit_ts",
ColumnId::new(6),
DataType::Timestamptz,
AdditionalColumn {
column_type: Some(additional_column::ColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
},
additional_column::ColumnType::Timestamp(AdditionalColumnTimestamp {}),
),
];

Expand Down

0 comments on commit 1707ad0

Please sign in to comment.