Skip to content

Commit

Permalink
[Improve][Connector-V2] Support schema evolution for mysql-cdc and my…
Browse files Browse the repository at this point in the history
…sql-jdbc (#6929)
  • Loading branch information
dailai authored Jul 5, 2024
1 parent 2abbf69 commit cf91e51
Show file tree
Hide file tree
Showing 37 changed files with 2,517 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,7 @@ protected Column(

/** Returns a copy of the column with a replaced name. */
public abstract Column rename(String newColumnName);

/** Returns a copy of the column with a replaced sourceType. */
public abstract Column reSourceType(String sourceType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public Column rename(String newColumnName) {
defaultValue,
comment);
}

@Override
public Column reSourceType(String sourceType) {
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,22 @@ public Column rename(String newColumnName) {
bitLen,
longColumnLength);
}

@Override
public Column reSourceType(String newSourceType) {
return new PhysicalColumn(
name,
dataType,
columnLength,
scale,
nullable,
defaultValue,
comment,
newSourceType,
options,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

@ToString(callSuper = true)
public abstract class AlterTableColumnEvent extends AlterTableEvent {

public AlterTableColumnEvent(TableIdentifier tableIdentifier) {
super(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class TableEvent implements SchemaChangeEvent {
protected final TableIdentifier tableIdentifier;
@Getter @Setter private String jobId;
@Getter @Setter private String statement;
@Getter @Setter protected String sourceDialectName;

@Override
public TableIdentifier tableIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.schema;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import com.google.common.collect.Lists;
import io.debezium.relational.history.HistoryRecord;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {

protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");

protected JdbcSourceConfig jdbcSourceConfig;

public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
}

@Override
public boolean support(SourceRecord record) {
String ddl = SourceRecordUtils.getDdl(record);
Struct value = (Struct) record.value();
List<Struct> tableChanges = value.getArray(HistoryRecord.Fields.TABLE_CHANGES);
if (tableChanges == null || tableChanges.isEmpty()) {
log.warn("Ignoring statement for non-captured table {}", ddl);
return false;
}
return StringUtils.isNotBlank(ddl)
&& SUPPORT_DDL.stream()
.map(String::toUpperCase)
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster;

import java.math.BigDecimal;
Expand Down Expand Up @@ -214,4 +215,9 @@ public static TablePath getTablePath(SourceRecord record) {
}
return TablePath.of(databaseName, schemaName, tableName);
}

public static String getDdl(SourceRecord record) {
Struct schemaChangeStruct = (Struct) record.value();
return schemaChangeStruct.getString(HistoryRecord.Fields.DDL_STATEMENTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ private void deserializeSchemaChangeRecord(
SourceRecord record, Collector<SeaTunnelRow> collector) {
SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo);
if (schemaChangeEvent == null) {
log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record);
log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record);
return;
}

if (resultTypeInfo instanceof MultipleRowType) {
Map<String, SeaTunnelRowType> newRowTypeMap = new HashMap<>();
for (Map.Entry<String, SeaTunnelRowType> entry : (MultipleRowType) resultTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public MySqlSourceConfig create(int subtaskId) {
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
// only DataStream API program need to emit the schema record, the Table API need not

// TODO Not yet supported
// Some scenarios do not require automatic capture of table structure changes, so the
// default setting is false.
props.setProperty("include.schema.changes", String.valueOf(false));
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
.setPhysicalRowType(physicalRowType)
.setResultTypeInfo(physicalRowType)
.setServerTimeZone(ZoneId.of(zoneId))
.setSchemaChangeResolver(
new MySqlSchemaChangeResolver(createSourceConfigFactory(config)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.relational.Tables;

import java.util.List;
import java.util.Objects;

public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver {
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser;

public MySqlSchemaChangeResolver(SourceConfig.Factory<JdbcSourceConfig> sourceConfigFactory) {
super(sourceConfigFactory.create(0));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(customMySqlAntlrDdlParser)) {
this.customMySqlAntlrDdlParser =
new CustomMySqlAntlrDdlParser(
tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName());
customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
customMySqlAntlrDdlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents =
customMySqlAntlrDdlParser.getAndClearParsedEvents();
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL);
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}
}
Loading

0 comments on commit cf91e51

Please sign in to comment.