Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Support schema evolution for mysql-cdc and mysql-jdbc #6929

Merged
merged 9 commits into from
Jul 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@ToString(callSuper = true)
public abstract class AlterTableColumnEvent extends AlterTableEvent {

@Getter @Setter protected String sourceColumnType;
dailai marked this conversation as resolved.
Show resolved Hide resolved

public AlterTableColumnEvent(TableIdentifier tableIdentifier) {
super(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class TableEvent implements SchemaChangeEvent {
protected final TableIdentifier tableIdentifier;
@Getter @Setter private String jobId;
@Getter @Setter private String statement;
@Getter @Setter protected String sourceDialectName;

@Override
public TableIdentifier tableIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.schema;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import com.google.common.collect.Lists;
import io.debezium.relational.history.HistoryRecord;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {

protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");

protected JdbcSourceConfig jdbcSourceConfig;

public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
}

@Override
public boolean support(SourceRecord record) {
dailai marked this conversation as resolved.
Show resolved Hide resolved
String ddl = SourceRecordUtils.getDdl(record);
dailai marked this conversation as resolved.
Show resolved Hide resolved
Struct value = (Struct) record.value();
List<Struct> tableChanges = value.getArray(HistoryRecord.Fields.TABLE_CHANGES);
if (tableChanges == null || tableChanges.isEmpty()) {
log.warn("Ignoring statement for non-captured table {}", ddl);
return false;
}
return StringUtils.isNotBlank(ddl)
&& SUPPORT_DDL.stream()
.map(String::toUpperCase)
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster;

import java.math.BigDecimal;
Expand Down Expand Up @@ -214,4 +215,9 @@ public static TablePath getTablePath(SourceRecord record) {
}
return TablePath.of(databaseName, schemaName, tableName);
}

public static String getDdl(SourceRecord record) {
Struct schemaChangeStruct = (Struct) record.value();
return schemaChangeStruct.getString(HistoryRecord.Fields.DDL_STATEMENTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ private void deserializeSchemaChangeRecord(
SourceRecord record, Collector<SeaTunnelRow> collector) {
SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo);
if (schemaChangeEvent == null) {
log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record);
log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record);
return;
}

if (resultTypeInfo instanceof MultipleRowType) {
Map<String, SeaTunnelRowType> newRowTypeMap = new HashMap<>();
for (Map.Entry<String, SeaTunnelRowType> entry : (MultipleRowType) resultTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public MySqlSourceConfig create(int subtaskId) {
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
// only DataStream API program need to emit the schema record, the Table API need not
dailai marked this conversation as resolved.
Show resolved Hide resolved

// TODO Not yet supported
// Some scenarios do not require automatic capture of table structure changes, so the
// default setting is false.
props.setProperty("include.schema.changes", String.valueOf(false));
// disable the offset flush totally
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
.setPhysicalRowType(physicalRowType)
.setResultTypeInfo(physicalRowType)
.setServerTimeZone(ZoneId.of(zoneId))
.setSchemaChangeResolver(
new MySqlSchemaChangeResolver(createSourceConfigFactory(config)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.relational.Tables;

import java.util.List;
import java.util.Objects;

public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver {
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser;

public MySqlSchemaChangeResolver(SourceConfig.Factory<JdbcSourceConfig> sourceConfigFactory) {
super(sourceConfigFactory.create(0));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(customMySqlAntlrDdlParser)) {
this.customMySqlAntlrDdlParser =
new CustomMySqlAntlrDdlParser(
tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName());
customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
customMySqlAntlrDdlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents =
customMySqlAntlrDdlParser.getAndClearParsedEvents();
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL);
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}
}
Loading
Loading