Skip to content

Commit

Permalink
[Improve][Connector-V2] Support schema evolution for jdbc sink
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Jun 12, 2024
1 parent 3a69527 commit e7930b0
Show file tree
Hide file tree
Showing 16 changed files with 584 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@ToString(callSuper = true)
public abstract class AlterTableColumnEvent extends AlterTableEvent {

@Getter @Setter protected String sourceColumnType;

public AlterTableColumnEvent(TableIdentifier tableIdentifier) {
super(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class TableEvent implements SchemaChangeEvent {
protected final TableIdentifier tableIdentifier;
@Getter @Setter private String jobId;
@Getter @Setter private String statement;
@Getter @Setter protected String sourceDialectName;

@Override
public TableIdentifier tableIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -64,14 +65,16 @@ public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType
customMySqlAntlrDdlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents =
customMySqlAntlrDdlParser.getAndClearParsedEvents();
return parsedEvents.isEmpty()
? null
: new AlterTableColumnsEvent(
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL);
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,24 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
() -> {
Column column = columnDefinitionListener.getColumn();
if (ctx.FIRST() != null) {
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.addFirst(
tableIdentifier, toSeatunnelColumn(column)));
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
} else if (ctx.AFTER() != null) {
String afterColumn = parser.parseName(ctx.uid(1));
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.addAfter(
tableIdentifier, toSeatunnelColumn(column), afterColumn));
tableIdentifier, toSeatunnelColumn(column), afterColumn);
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
} else {
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.add(
tableIdentifier, toSeatunnelColumn(column)));
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
}
listeners.remove(columnDefinitionListener);
},
Expand Down Expand Up @@ -148,18 +154,24 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
() -> {
Column column = columnDefinitionListener.getColumn();
if (ctx.FIRST() != null) {
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableModifyColumnEvent.addFirst(
tableIdentifier, toSeatunnelColumn(column)));
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
} else if (ctx.AFTER() != null) {
String afterColumn = parser.parseName(ctx.uid(1));
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableModifyColumnEvent.addAfter(
tableIdentifier, toSeatunnelColumn(column), afterColumn));
tableIdentifier, toSeatunnelColumn(column), afterColumn);
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
} else {
changes.add(
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableModifyColumnEvent.add(
tableIdentifier, toSeatunnelColumn(column)));
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
changes.add(alterTableAddColumnEvent);
}
listeners.remove(columnDefinitionListener);
},
Expand Down Expand Up @@ -187,13 +199,14 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String oldColumnName = column.name();
String newColumnName = parser.parseName(ctx.newColumn);
Column newColumn = column.edit().name(newColumnName).create();
AlterTableChangeColumnEvent alterTableChangeColumnEvent =
AlterTableChangeColumnEvent.change(
tableIdentifier, oldColumnName, toSeatunnelColumn(newColumn));
if (StringUtils.isNotBlank(newColumnName)
&& !StringUtils.equals(oldColumnName, newColumnName)) {
changes.add(
AlterTableChangeColumnEvent.change(
tableIdentifier,
oldColumnName,
toSeatunnelColumn(newColumn)));
alterTableChangeColumnEvent.setSourceColumnType(
getSourceColumnType(newColumn));
changes.add(alterTableChangeColumnEvent);
}
listeners.remove(columnDefinitionListener);
},
Expand All @@ -215,4 +228,17 @@ private org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumn(Column c
private TableIdentifier toTableIdentifier(TableId tableId) {
return new TableIdentifier("", tableId.catalog(), tableId.schema(), tableId.table());
}

private String getSourceColumnType(Column column) {
StringBuilder sb = new StringBuilder(column.typeName());
if (column.length() >= 0) {
sb.append('(').append(column.length());
if (column.scale().isPresent()) {
sb.append(", ").append(column.scale().get());
}

sb.append(')');
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
uniqueColumn = false;
optionalColumn = new AtomicReference<>();
resolveColumnDataType(ctx.dataType());
defaultValueListener = new DefaultValueParserListener(columnEditor, optionalColumn);
defaultValueListener = new CustomDefaultValueParserListener(columnEditor, optionalColumn);
listeners.add(defaultValueListener);
super.enterColumnDefinition(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser;

import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.relational.ColumnEditor;

import java.util.concurrent.atomic.AtomicReference;

public class CustomDefaultValueParserListener extends DefaultValueParserListener {

private final ColumnEditor columnEditor;

public CustomDefaultValueParserListener(
ColumnEditor columnEditor, AtomicReference<Boolean> optionalColumn) {
super(columnEditor, optionalColumn);
this.columnEditor = columnEditor;
}

@Override
public void enterDefaultValue(MySqlParser.DefaultValueContext ctx) {
if (ctx.currentTimestamp() != null && !ctx.currentTimestamp().isEmpty()) {
if (ctx.currentTimestamp().size() > 1 || (ctx.ON() == null && ctx.UPDATE() == null)) {
final MySqlParser.CurrentTimestampContext currentTimestamp =
ctx.currentTimestamp(0);
columnEditor.defaultValueExpression(currentTimestamp.getText());
}
} else {
super.enterDefaultValue(ctx);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public TableId parseQualifiedTableId(MySqlParser.FullIdContext fullIdContext) {
// Remove it when debezium fixed this issue.
@Override
protected DataTypeResolver initializeDataTypeResolver() {
DataTypeResolver dataTypeResolver1 = super.initializeDataTypeResolver();
DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder();

dataTypeResolverBuilder.registerDataTypes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;

import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDefaultValueConverter;
Expand All @@ -29,6 +30,9 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.Optional;

/** Utilities for converting from MySQL types to SeaTunnel types. */
@Slf4j
public class MySqlTypeUtils {
Expand Down Expand Up @@ -62,10 +66,17 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
MySqlValueConverters::defaultParsingErrorHandler);
MySqlDefaultValueConverter mySqlDefaultValueConverter =
new MySqlDefaultValueConverter(mySqlValueConverters);
Object defaultValue =
mySqlDefaultValueConverter
.parseDefaultValue(column, column.defaultValueExpression().orElse(null))
.orElse(null);

Optional<String> defaultValueExpression = column.defaultValueExpression();
Object defaultValue = defaultValueExpression.orElse(null);
if (defaultValueExpression.isPresent()
&& Objects.nonNull(defaultValue)
&& !MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue)) {
defaultValue =
mySqlDefaultValueConverter
.parseDefaultValue(column, defaultValueExpression.get())
.orElse(null);
}
BasicTypeDefine.BasicTypeDefineBuilder builder =
BasicTypeDefine.builder()
.name(column.name())
Expand All @@ -74,6 +85,7 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
.length((long) column.length())
.precision((long) column.length())
.scale(column.scale().orElse(0))
.nullable(column.isOptional())
.defaultValue(defaultValue);
switch (column.typeName().toUpperCase()) {
case MySqlTypeConverter.MYSQL_CHAR:
Expand Down
Loading

0 comments on commit e7930b0

Please sign in to comment.