Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Jun 20, 2024
1 parent 633ed10 commit 7fa1d33
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 26 deletions.
14 changes: 14 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,20 @@ pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
);
}

/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`,
/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`.
/// (But for now this isn't a large problem, since drop column is not allowed for source yet..)
///
/// Besides, the logic of column id handling is a mess.
/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end;
/// In other places, we create column id ad-hoc.
pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
// XXX: should we check the column IDs of struct fields here?
columns
.iter()
.fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id()))
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::plan_common::PbColumnDesc;
Expand Down
8 changes: 1 addition & 7 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;

use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::catalog::{max_column_id, ColumnCatalog, ColumnDesc, ColumnId};
use risingwave_common::types::{DataType, StructType};
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as PbDataType;
Expand All @@ -29,7 +29,6 @@ use risingwave_pb::plan_common::{
};

use crate::error::ConnectorResult;
use crate::parser::max_column_id;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
Expand Down Expand Up @@ -281,11 +280,6 @@ pub fn source_add_partition_offset_cols(
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
let mut columns_exist = [false; 2];
// let mut last_column_id = columns
// .iter()
// .map(|c| c.column_desc.column_id)
// .max()
// .unwrap_or(ColumnId::placeholder());
let mut last_column_id = max_column_id(columns);

let additional_columns: Vec<_> = {
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ mod util;
pub use debezium::DEBEZIUM_IGNORE_KEY;
use risingwave_common::buffer::BitmapBuilder;
pub use unified::{AccessError, AccessResult};
pub use util::max_column_id;

/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
pub struct SourceStreamChunkBuilder {
Expand Down
15 changes: 0 additions & 15 deletions src/connector/src/parser/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use anyhow::Context;
use bytes::Bytes;
use reqwest::Url;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_common::types::{Datum, DatumCow, DatumRef};
use risingwave_pb::data::DataType as PbDataType;

Expand Down Expand Up @@ -173,17 +172,3 @@ pub fn extract_header_inner_from_meta<'a>(
_ => None,
}
}

/// FIXME: perhapts we should use sth like `ColumnIdGenerator::new_alter`,
/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`.
/// (But for now this isn't a large problem, since drop column is not allowed for source yet..)
///
/// Besides, the logic of column id handling is a mess.
/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end;
/// In other places, we create column id ad-hoc.
pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
// XXX: should we check the column IDs of struct fields here?
columns
.iter()
.fold(ColumnId::new(i32::MIN), |a, b| a.max(b.column_id()))
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::parser::max_column_id;
use risingwave_common::catalog::max_column_id;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
AlterSourceOperation, ColumnDef, CreateSourceStatement, ObjectName, Statement,
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use anyhow::Context;
use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_connector::parser::max_column_id;
use risingwave_common::catalog::{max_column_id, ColumnCatalog};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
Expand Down

0 comments on commit 7fa1d33

Please sign in to comment.