Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deny create MV on shared CDC source #15635

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ create source mysql_mytest with (
server.id = '5601'
);

statement error Should not create MATERIALIZED VIEW directly on shared CDC source.
create materialized view mv as select * from mysql_mytest;

statement error The upstream table name must contain database name prefix*
create table products_test ( id INT,
name STRING,
Expand Down
8 changes: 3 additions & 5 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::mysql_row_to_owned_row;
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
use crate::WithPropertiesExt;

#[derive(Debug)]
pub enum CdcTableType {
Expand All @@ -46,11 +47,8 @@ pub enum CdcTableType {
}

impl CdcTableType {
pub fn from_properties(with_properties: &HashMap<String, String>) -> Self {
let connector = with_properties
.get("connector")
.map(|c| c.to_ascii_lowercase())
.unwrap_or_default();
pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
let connector = with_properties.get_connector().unwrap_or_default();
match connector.as_str() {
"mysql-cdc" => Self::MySql,
"postgres-cdc" => Self::Postgres,
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{BTreeMap, HashMap};

use crate::source::cdc::external::CdcTableType;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
Expand Down Expand Up @@ -79,7 +80,7 @@ impl Get for BTreeMap<String, String> {
}

/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
pub trait WithPropertiesExt: Get {
pub trait WithPropertiesExt: Get + Sized {
#[inline(always)]
fn get_connector(&self) -> Option<String> {
self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
Expand All @@ -101,6 +102,10 @@ pub trait WithPropertiesExt: Get {
connector.contains("-cdc")
}

fn is_shared_cdc_source(&self) -> bool {
self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the case for non-shared CDC ('citus-cdc'). Will CREATE SOURCE for it be rejected (and when?)

Copy link
Contributor

@StrikeW StrikeW Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we cannot distinguish whether it is going to create a share source from properties only. is_shared_cdc_source should not provided in the trait.

Copy link
Member Author

@xxchan xxchan Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? This is just extracted from the current implementation in create_source. What it should be like?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, because we do this check in create_source, so the context is to create a Source. But if without this context, we cannot distinguish it. is_shared_cdc_source in the trait can be called in other place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

BTW, what are the other cases? When don't we create shared CDC source for mysql-cdc? (Do you mean CREATE TABLE directly? Any other cases?)

And the original question: Can we CREATE SOURCE for citus-cdc? If not, where is it rejected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, what are the other cases? When don't we create shared CDC source for mysql-cdc? (Do you mean CREATE TABLE directly? Any other cases?)

CREATE TABLE directly, no other cases.

And the original question: Can we CREATE SOURCE for citus-cdc? If not, where is it rejected?

We can't. But it seems we didn't ban it in code.

   // gated the feature with a session variable
    let create_cdc_source_job = if with_properties.is_cdc_connector() {
        CdcTableType::from_properties(&with_properties).can_backfill()
    } else {
        false
    };

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the function to is_backfillable_cdc_connector. I think there won't be any confusion now.

For BoundSource, we can call its method is_shared_cdc_source

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For BoundSource, we can call its method is_shared_cdc_source

Seems not true. CREATE TABLE will also have a Source node.. Let me just call it is_backfillable_cdc_connector

}

#[inline(always)]
fn is_iceberg_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{Statement, TableAlias};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -52,6 +53,12 @@ pub struct BoundSource {
pub catalog: SourceCatalog,
}

impl BoundSource {
pub fn is_shared_cdc_source(&self) -> bool {
self.catalog.with_properties.is_shared_cdc_source()
}
}

impl From<&SourceCatalog> for BoundSource {
fn from(s: &SourceCatalog) -> Self {
Self { catalog: s.clone() }
Expand Down
8 changes: 1 addition & 7 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::external::CdcTableType;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
Expand Down Expand Up @@ -1307,12 +1306,7 @@ pub async fn handle_create_source(
ensure_table_constraints_supported(&stmt.constraints)?;
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?;

// gated the feature with a session variable
let create_cdc_source_job = if with_properties.is_cdc_connector() {
CdcTableType::from_properties(&with_properties).can_backfill()
} else {
false
};
let create_cdc_source_job = with_properties.is_shared_cdc_source();

let (columns_from_resolve_source, source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema, &with_properties)?
Expand Down
19 changes: 13 additions & 6 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,19 @@ impl Planner {
}

pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
Ok(LogicalSource::with_catalog(
Rc::new(source.catalog),
SourceNodeKind::CreateMViewOrBatch,
self.ctx(),
)?
.into())
if source.is_shared_cdc_source() {
Err(ErrorCode::InternalError(
"Should not create MATERIALIZED VIEW directly on shared CDC source. HINT: create TABLE from the source instead.".to_string(),
)
.into())
} else {
Ok(LogicalSource::with_catalog(
Rc::new(source.catalog),
SourceNodeKind::CreateMViewOrBatch,
self.ctx(),
)?
.into())
}
}

pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
Expand Down
Loading