Skip to content

Commit

Permalink
[Feature][Connector-v2] Support schema evolution for Oracle connector
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Oct 28, 2024
1 parent 88be4fd commit 10547d9
Show file tree
Hide file tree
Showing 34 changed files with 2,360 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ private SeaTunnelRowType applyChangeColumn(
String oldColumn = changeColumnEvent.getOldColumn();
int oldColumnIndex = dataType.indexOf(oldColumn);

// Rename column of oracle which only has the name of old column and the name of new column,
// so we need to fill the data type which is the same as the old column.
SeaTunnelDataType<?> fieldType = dataType.getFieldType(oldColumnIndex);
Column column = changeColumnEvent.getColumn();
if (column.getDataType() == null) {
column = column.copy(fieldType);
}

return applyModifyColumn(
dataType,
oldColumnIndex,
changeColumnEvent.getColumn(),
column,
changeColumnEvent.isFirst(),
changeColumnEvent.getAfterColumn());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.connectors.cdc.base.schema;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;

Expand All @@ -25,17 +31,24 @@
import org.apache.kafka.connect.source.SourceRecord;

import com.google.common.collect.Lists;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecord;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Objects;

@Slf4j
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {

protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");

protected JdbcSourceConfig jdbcSourceConfig;
protected final JdbcSourceConfig jdbcSourceConfig;
@Setter protected transient DdlParser ddlParser;
@Setter protected transient Tables tables;
@Setter protected String sourceDialectName;

public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
Expand All @@ -55,4 +68,39 @@ public boolean support(SourceRecord record) {
.map(String::toUpperCase)
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(ddlParser)) {
this.ddlParser = createDdlParser(tablePath);
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
ddlParser.setCurrentDatabase(tablePath.getDatabaseName());
ddlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
ddlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents = getAndClearParsedEvents();
parsedEvents.forEach(e -> e.setSourceDialectName(getSourceDialectName()));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(getSourceDialectName());
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}

protected abstract DdlParser createDdlParser(TablePath tablePath);

protected abstract List<AlterTableColumnEvent> getAndClearParsedEvents();

protected abstract String getSourceDialectName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.source.parser;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import org.apache.commons.lang3.StringUtils;

import io.debezium.relational.Column;
import io.debezium.relational.TableId;

public interface SeatunnelDDLParser {

/**
* @param column The column to convert
* @return The converted column in SeaTunnel format which has full type information
*/
default org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumnWithFullTypeInfo(
Column column) {
org.apache.seatunnel.api.table.catalog.Column seatunnelColumn = toSeatunnelColumn(column);
String sourceColumnType = getSourceColumnTypeWithLengthScale(column);
return seatunnelColumn.reSourceType(sourceColumnType);
}

/**
* @param column The column to convert
* @return The converted column in SeaTunnel format
*/
org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumn(Column column);

/**
* @param column The column to convert
* @return The type with length and scale
*/
default String getSourceColumnTypeWithLengthScale(Column column) {
StringBuilder sb = new StringBuilder(column.typeName());
if (column.length() >= 0) {
sb.append('(').append(column.length());
if (column.scale().isPresent()) {
sb.append(", ").append(column.scale().get());
}

sb.append(')');
}
return sb.toString();
}

default TableIdentifier toTableIdentifier(TableId tableId) {
return new TableIdentifier(
StringUtils.EMPTY, tableId.catalog(), tableId.schema(), tableId.table());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -38,6 +40,7 @@
import java.util.Arrays;

import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;

Expand All @@ -46,8 +49,8 @@ public class SourceRecordUtils {

private SourceRecordUtils() {}

public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME_TEMPLATE =
"io.debezium.connector.%s.SchemaChangeKey";
public static final String HEARTBEAT_VALUE_SCHEMA_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
Expand Down Expand Up @@ -97,7 +100,13 @@ public static Long getFetchTimestamp(SourceRecord record) {

public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
Schema keySchema = sourceRecord.keySchema();
return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
Schema valueSchema = sourceRecord.valueSchema();
Struct value = (Struct) sourceRecord.value();
return keySchema != null
&& valueSchema != null
&& valueSchema.field(Envelope.FieldName.SOURCE) != null
&& value.getStruct(Envelope.FieldName.SOURCE) != null
&& getSchemaChangeKey(sourceRecord).equalsIgnoreCase(keySchema.name());
}

public static boolean isDataChangeRecord(SourceRecord record) {
Expand All @@ -123,6 +132,21 @@ public static TableId getTableId(SourceRecord dataRecord) {
return new TableId(dbName, schemaName, tableName);
}

public static String getConnectorName(SourceRecord sourceRecord) {
Struct value = (Struct) sourceRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String connectorName = source.getString(DEBEZIUM_CONNECTOR_KEY);
if (StringUtils.isBlank(connectorName)) {
throw new SeaTunnelException("Connector name is empty in SourceRecord.");
}
return connectorName;
}

public static String getSchemaChangeKey(SourceRecord sourceRecord) {
String connectorName = getConnectorName(sourceRecord);
return String.format(SCHEMA_CHANGE_EVENT_KEY_NAME_TEMPLATE, connectorName);
}

public static String getSchemaName(Struct source) {
if (source.schema().fields().stream().anyMatch(r -> SCHEMA_NAME_KEY.equals(r.name()))) {
return source.getString(SCHEMA_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)

private void deserializeSchemaChangeRecord(
SourceRecord record, Collector<SeaTunnelRow> collector) {
SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo);
SchemaChangeEvent schemaChangeEvent;
try {
schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo);
} catch (Exception e) {
log.warn("Failed to resolve schemaChangeEvent, just skip.", e);
return;
}
if (schemaChangeEvent == null) {
log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static io.debezium.config.CommonConnectorConfig.TRANSACTION_TOPIC;
import static io.debezium.connector.AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -336,16 +337,36 @@ static SourceRecord createSchemaChangeUnknownEvent() {

static SourceRecord createSchemaChangeEvent(String topic) {
Schema keySchema =
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
SchemaBuilder.struct()
.name(
String.format(
SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME_TEMPLATE,
"mysql"))
.build();
Schema valueKeySchema =
SchemaBuilder.struct()
.name("io.debezium.connector.mysql.Source")
.field(DEBEZIUM_CONNECTOR_KEY, Schema.STRING_SCHEMA)
.build();
Struct valueValues = new Struct(valueKeySchema);
valueValues.put(DEBEZIUM_CONNECTOR_KEY, "mysql");

Schema valueSchema =
SchemaBuilder.struct()
.field(Envelope.FieldName.SOURCE, valueKeySchema)
.name("")
.build();
Struct value = new Struct(valueSchema);
value.put(valueSchema.field(Envelope.FieldName.SOURCE), valueValues);
SourceRecord record =
new SourceRecord(
Collections.emptyMap(),
Collections.emptyMap(),
topic,
keySchema,
null,
null,
null);
valueSchema,
value);
Assertions.assertTrue(SourceRecordUtils.isSchemaChangeEvent(record));
return record;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,37 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;

import java.util.List;
import java.util.Objects;

public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver {
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser;

public MySqlSchemaChangeResolver(SourceConfig.Factory<JdbcSourceConfig> sourceConfigFactory) {
super(sourceConfigFactory.create(0));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(customMySqlAntlrDdlParser)) {
this.customMySqlAntlrDdlParser =
new CustomMySqlAntlrDdlParser(
tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName());
customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
customMySqlAntlrDdlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents =
customMySqlAntlrDdlParser.getAndClearParsedEvents();
parsedEvents.forEach(e -> e.setSourceDialectName(DatabaseIdentifier.MYSQL));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL);
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
protected DdlParser createDdlParser(TablePath tablePath) {
return new CustomMySqlAntlrDdlParser(
tablePath, this.jdbcSourceConfig.getDbzConnectorConfig());
}

@Override
protected List<AlterTableColumnEvent> getAndClearParsedEvents() {
return ((CustomMySqlAntlrDdlParser) ddlParser).getAndClearParsedEvents();
}

@Override
protected String getSourceDialectName() {
return DatabaseIdentifier.MYSQL;
}
}
Loading

0 comments on commit 10547d9

Please sign in to comment.