Skip to content

Commit

Permalink
fix: improve CDC connector param check (risingwavelabs#8450)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Mar 12, 2023
1 parent b7c46d4 commit e51f639
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 36 deletions.
1 change: 0 additions & 1 deletion e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ create table shipments (
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
);
Expand Down
3 changes: 0 additions & 3 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ create table shipments (
username = 'posres',
password = 'postgres',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
);
Expand All @@ -38,7 +37,6 @@ create table shipments (
username = 'postgres',
password = 'otgres',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
);
Expand All @@ -59,7 +57,6 @@ create table shipments (
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'shipment',
slot.name = 'shipments'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,22 @@
import java.util.HashMap;
import java.util.Map;

public class ConnectorConfig {
public class ConnectorConfig extends HashMap<String, String> {

public ConnectorConfig() {}

public ConnectorConfig(Map<? extends String, ? extends String> m) {
super(m);
}

public String getNonNull(String key) {
String value = super.get(key);
if (value == null) {
throw new RuntimeException(key + "cannot be null");
}
return value;
}

/* Common configs */
public static final String HOST = "hostname";
public static final String PORT = "port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
SourceTypeE.valueOf(startRequest.getSourceType()),
startRequest.getSourceId(),
startRequest.getStartOffset(),
startRequest.getPropertiesMap());
new ConnectorConfig(startRequest.getPropertiesMap()));
if (handler == null) {
LOG.error("failed to create source handler");
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@

package com.risingwave.sourcenode.core;

import com.risingwave.connector.api.source.ConnectorConfig;
import com.risingwave.connector.api.source.SourceHandler;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.sourcenode.mysql.MySqlSourceConfig;
import com.risingwave.sourcenode.postgres.PostgresSourceConfig;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SourceHandlerFactory {
static final Logger LOG = LoggerFactory.getLogger(SourceHandlerFactory.class);

public static SourceHandler createSourceHandler(
SourceTypeE type, long sourceId, String startOffset, Map<String, String> userProps) {
SourceTypeE type, long sourceId, String startOffset, ConnectorConfig userProps) {
switch (type) {
case MYSQL:
return DefaultSourceHandler.newWithConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore;
import com.risingwave.sourcenode.common.DebeziumCdcUtils;
import java.util.Map;
import java.util.Properties;

/** MySQL Source Config */
Expand All @@ -29,7 +28,7 @@ public class MySqlSourceConfig implements SourceConfig {
private final long id;
private final String sourceName;

public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
public MySqlSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
id = sourceId;
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty(
Expand All @@ -56,9 +55,9 @@ public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String>
props.setProperty("database.include.list", userProps.get(ConnectorConfig.DB_NAME));
// only captures data of the specified table
String tableFilter =
userProps.get(ConnectorConfig.DB_NAME)
userProps.getNonNull(ConnectorConfig.DB_NAME)
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);
+ userProps.getNonNull(ConnectorConfig.TABLE_NAME);
props.setProperty("table.include.list", tableFilter);

// disable schema change events for current stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.risingwave.sourcenode.common.DebeziumCdcUtils;
import io.debezium.heartbeat.Heartbeat;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;

/** Postgres Source Config */
Expand All @@ -32,7 +31,7 @@ public class PostgresSourceConfig implements SourceConfig {
private final String sourceName;
private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();

public PostgresSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
id = sourceId;
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty(
Expand All @@ -46,12 +45,16 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
props.setProperty(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME);
String schema = userProps.getNonNull(ConnectorConfig.PG_SCHEMA_NAME);
String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME);

// Begin of connector configs
props.setProperty("database.hostname", userProps.get(ConnectorConfig.HOST));
props.setProperty("database.port", userProps.get(ConnectorConfig.PORT));
props.setProperty("database.user", userProps.get(ConnectorConfig.USER));
props.setProperty("database.password", userProps.get(ConnectorConfig.PASSWORD));
props.setProperty("database.dbname", userProps.get(ConnectorConfig.DB_NAME));
props.setProperty("database.dbname", dbName);
// The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.
// Supported values are decoderbufs, and pgoutput.
// The wal2json plug-in is deprecated and scheduled for removal.
Expand All @@ -77,24 +80,21 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());

String tableFilter =
userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);
String tableFilter = schema + "." + table;
props.setProperty("table.include.list", tableFilter);
props.setProperty("database.server.name", DB_SERVER_NAME_PREFIX + tableFilter);

// host:port:database.schema.table
sourceName =
userProps.get(ConnectorConfig.HOST)
userProps.getNonNull(ConnectorConfig.HOST)
+ ":"
+ userProps.get(ConnectorConfig.PORT)
+ userProps.getNonNull(ConnectorConfig.PORT)
+ ":"
+ userProps.get(ConnectorConfig.DB_NAME)
+ dbName
+ "."
+ userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
+ schema
+ "."
+ userProps.get(ConnectorConfig.TABLE_NAME);
+ table;
props.setProperty("name", sourceName);

// pass through debezium properties if any
Expand Down
27 changes: 18 additions & 9 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub(crate) fn is_kafka_source(with_properties: &HashMap<String, String>) -> bool
pub(crate) async fn resolve_source_schema(
source_schema: SourceSchema,
columns: &mut Vec<ColumnCatalog>,
with_properties: &HashMap<String, String>,
with_properties: &mut HashMap<String, String>,
row_id_index: &mut Option<usize>,
pk_column_ids: &mut Vec<ColumnId>,
is_materialized: bool,
Expand Down Expand Up @@ -213,11 +213,7 @@ pub(crate) async fn resolve_source_schema(

columns_extend(
columns,
extract_protobuf_table_schema(
protobuf_schema,
with_properties.clone().into_iter().collect(),
)
.await?,
extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?,
);

StreamSourceInfo {
Expand Down Expand Up @@ -525,7 +521,7 @@ fn source_shema_to_row_format(source_schema: &SourceSchema) -> RowFormatType {

fn validate_compatibility(
source_schema: &SourceSchema,
props: &HashMap<String, String>,
props: &mut HashMap<String, String>,
) -> Result<()> {
let connector = get_connector(props);
let row_format = source_shema_to_row_format(source_schema);
Expand Down Expand Up @@ -561,6 +557,19 @@ fn validate_compatibility(
connector, row_format
))));
}

if connector == POSTGRES_CDC_CONNECTOR {
if !props.contains_key("slot.name") {
// Build a random slot name with UUID
// e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
let uuid = uuid::Uuid::new_v4().to_string().replace('-', "");
props.insert("slot.name".into(), format!("rw_cdc_{}", uuid));
}
if !props.contains_key("schema.name") {
// Default schema name is "public"
props.insert("schema.name".into(), "public".into());
}
}
Ok(())
}

Expand All @@ -576,7 +585,7 @@ pub async fn handle_create_source(
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?;
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

let with_properties = handler_args
let mut with_properties = handler_args
.with_options
.inner()
.clone()
Expand Down Expand Up @@ -606,7 +615,7 @@ pub async fn handle_create_source(
let source_info = resolve_source_schema(
stmt.source_schema,
&mut columns,
&with_properties,
&mut with_properties,
&mut row_id_index,
&mut pk_column_ids,
false,
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
append_only: bool,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;
let properties = context.with_options().inner().clone().into_iter().collect();
let mut properties = context.with_options().inner().clone().into_iter().collect();

let (mut columns, mut pk_column_ids, mut row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;
Expand All @@ -311,7 +311,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
let source_info = resolve_source_schema(
source_schema,
&mut columns,
&properties,
&mut properties,
&mut row_id_index,
&mut pk_column_ids,
true,
Expand All @@ -322,6 +322,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
context.into(),
table_name,
columns,
properties,
pk_column_ids,
row_id_index,
Some(source_info),
Expand All @@ -346,12 +347,14 @@ pub(crate) fn gen_create_table_plan(
let definition = context.normalized_sql().to_owned();
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;

let properties = context.with_options().inner().clone().into_iter().collect();
gen_create_table_plan_without_bind(
context,
table_name,
column_descs,
pk_column_id_from_columns,
constraints,
properties,
definition,
source_watermarks,
append_only,
Expand All @@ -366,6 +369,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
column_descs: Vec<ColumnDesc>,
pk_column_id_from_columns: Option<ColumnId>,
constraints: Vec<TableConstraint>,
properties: HashMap<String, String>,
definition: String,
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
Expand All @@ -385,6 +389,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
context.into(),
table_name,
columns,
properties,
pk_column_ids,
row_id_index,
None,
Expand All @@ -400,6 +405,7 @@ fn gen_table_plan_inner(
context: OptimizerContextRef,
table_name: ObjectName,
columns: Vec<ColumnCatalog>,
properties: HashMap<String, String>,
pk_column_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
source_info: Option<StreamSourceInfo>,
Expand All @@ -425,7 +431,7 @@ fn gen_table_plan_inner(
.map(|column| column.to_protobuf())
.collect_vec(),
pk_column_ids: pk_column_ids.iter().map(Into::into).collect_vec(),
properties: context.with_options().inner().clone().into_iter().collect(),
properties,
info: Some(source_info),
owner: session.user_id(),
watermark_descs: watermark_descs.clone(),
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,19 @@ pub async fn handle_create_as(

let (graph, source, table) = {
let context = OptimizerContext::from_handler_args(handler_args.clone());
let properties = handler_args
.with_options
.inner()
.clone()
.into_iter()
.collect();
let (plan, source, table) = gen_create_table_plan_without_bind(
context,
table_name.clone(),
column_descs,
None,
vec![],
properties,
"".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS`
vec![], // No watermark should be defined in for `CREATE TABLE AS`
append_only,
Expand Down

0 comments on commit e51f639

Please sign in to comment.