From 3a6952792fbbc8447bf1a0b6cffb924ea8f3ccc8 Mon Sep 17 00:00:00 2001 From: dailai Date: Fri, 31 May 2024 16:32:25 +0800 Subject: [PATCH] [Improve][Connector-V2] Support schema evolution for mysql-cdc --- .../schema/AbstractSchemaChangeResolver.java | 42 +++ .../cdc/base/utils/SourceRecordUtils.java | 6 + ...SeaTunnelRowDebeziumDeserializeSchema.java | 3 +- .../config/MySqlSourceConfigFactory.java | 2 +- .../mysql/source/MySqlIncrementalSource.java | 2 + .../source/MySqlSchemaChangeResolver.java | 77 +++++ .../CustomAlterTableParserListener.java | 218 ++++++++++++ .../CustomColumnDefinitionParserListener.java | 325 ++++++++++++++++++ .../parser/CustomMySqlAntlrDdlParser.java | 305 ++++++++++++++++ .../CustomMySqlAntlrDdlParserListener.java | 121 +++++++ .../cdc/mysql/testutils/UniqueDatabase.java | 7 +- .../exception/JdbcConnectorErrorCode.java | 5 +- .../internal/connection/DataSourceUtils.java | 28 +- .../jdbc/internal/dialect/JdbcDialect.java | 13 + .../internal/dialect/mysql/MysqlDialect.java | 24 ++ .../seatunnel/jdbc/sink/AbstractJdbcSink.java | 135 ++++++++ .../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 36 +- .../seatunnel/jdbc/sink/JdbcSink.java | 17 +- .../seatunnel/jdbc/sink/JdbcSinkWriter.java | 28 +- .../seatunnel/jdbc/utils/JdbcUtils.java | 120 +++++++ .../cdc/mysql/MysqlCDCWithSchemaChangeIT.java | 213 ++++++++++++ .../src/test/resources/ddl/add_columns.sql | 46 +++ .../src/test/resources/ddl/inventory.sql | 3 +- .../src/test/resources/ddl/shop.sql | 48 +++ .../mysqlcdc_to_mysql_with_schema_change.conf | 54 +++ 25 files changed, 1837 insertions(+), 41 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSink.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java new file mode 100644 index 000000000000..258d1a9d8023 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.schema; + +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.source.SourceRecord; + +import com.google.common.collect.Lists; + +import java.util.List; + +public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver { + + protected static final List SUPPORT_DDL = Lists.newArrayList("ALTER TABLE"); + + @Override + public boolean support(SourceRecord record) { + String ddl = SourceRecordUtils.getDdl(record); + // Currently, only ddl statements with modified table structures are supported + return StringUtils.isNotBlank(ddl) + && SUPPORT_DDL.stream() + .map(String::toUpperCase) + .anyMatch(prefix -> ddl.toUpperCase().contains(prefix)); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index e06213b06d5e..3245273ace2b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -28,6 +28,7 @@ import io.debezium.data.Envelope; import io.debezium.document.DocumentReader; import io.debezium.relational.TableId; +import io.debezium.relational.history.HistoryRecord; import io.debezium.util.SchemaNameAdjuster; import java.math.BigDecimal; @@ -214,4 +215,9 @@ public static TablePath getTablePath(SourceRecord record) { } return TablePath.of(databaseName, schemaName, tableName); } + + public static String getDdl(SourceRecord record) { + Struct schemaChangeStruct = (Struct) record.value(); + return schemaChangeStruct.getString(HistoryRecord.Fields.DDL_STATEMENTS); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index d249b0d92763..1fd706290185 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -119,10 +119,9 @@ private void deserializeSchemaChangeRecord( SourceRecord record, Collector collector) { SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo); if (schemaChangeEvent == null) { - log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record); + log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record); return; } - if (resultTypeInfo instanceof MultipleRowType) { Map newRowTypeMap = new HashMap<>(); for (Map.Entry entry : (MultipleRowType) resultTypeInfo) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index 7168da8a8465..d34dfe0e0172 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -76,7 +76,7 @@ public MySqlSourceConfig create(int subtaskId) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not - // TODO Not yet supported + // already supported props.setProperty("include.schema.changes", String.valueOf(false)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java index 434ce75cdd16..1d2d9f9a033e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java @@ -102,6 +102,8 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( .setPhysicalRowType(physicalRowType) .setResultTypeInfo(physicalRowType) .setServerTimeZone(ZoneId.of(zoneId)) + .setSchemaChangeResolver( + new MySqlSchemaChangeResolver(createSourceConfigFactory(config))) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java new file mode 100644 index 000000000000..9f7c65a49251 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source; + +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; +import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; +import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver; +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.source.SourceRecord; + +import io.debezium.relational.Tables; + +import java.util.List; +import java.util.Objects; + +public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver { + private SourceConfig.Factory sourceConfigFactory; + private transient Tables tables; + private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser; + + public MySqlSchemaChangeResolver(SourceConfig.Factory sourceConfigFactory) { + this.sourceConfigFactory = sourceConfigFactory; + } + + @Override + public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) { + TablePath tablePath = SourceRecordUtils.getTablePath(record); + String ddl = SourceRecordUtils.getDdl(record); + if (Objects.isNull(customMySqlAntlrDdlParser)) { + this.customMySqlAntlrDdlParser = + new CustomMySqlAntlrDdlParser( + tablePath, this.sourceConfigFactory.create(0).getDbzConnectorConfig()); + } + if (Objects.isNull(tables)) { + this.tables = new Tables(); + } + customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName()); + customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName()); + // Parse DDL statement using Debezium's Antlr parser + customMySqlAntlrDdlParser.parse(ddl, tables); + List parsedEvents = + customMySqlAntlrDdlParser.getAndClearParsedEvents(); + return parsedEvents.isEmpty() + ? null + : new AlterTableColumnsEvent( + TableIdentifier.of( + StringUtils.EMPTY, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()), + parsedEvents); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java new file mode 100644 index 000000000000..ef75c223efb8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlTypeUtils; + +import org.apache.commons.lang3.StringUtils; + +import org.antlr.v4.runtime.tree.ParseTreeListener; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; + +import java.util.LinkedList; +import java.util.List; + +public class CustomAlterTableParserListener extends MySqlParserBaseListener { + private static final int STARTING_INDEX = 1; + private final MySqlAntlrDdlParser parser; + private final List listeners; + private final LinkedList changes; + private List columnEditors; + private TableIdentifier tableIdentifier; + + private CustomColumnDefinitionParserListener columnDefinitionListener; + + private int parsingColumnIndex = STARTING_INDEX; + + private RelationalDatabaseConnectorConfig dbzConnectorConfig; + + public CustomAlterTableParserListener( + RelationalDatabaseConnectorConfig dbzConnectorConfig, + MySqlAntlrDdlParser parser, + List listeners, + LinkedList changes) { + this.dbzConnectorConfig = dbzConnectorConfig; + this.parser = parser; + this.listeners = listeners; + this.changes = changes; + } + + @Override + public void enterAlterTable(MySqlParser.AlterTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + this.tableIdentifier = toTableIdentifier(tableId); + super.enterAlterTable(ctx); + } + + @Override + public void exitAlterTable(MySqlParser.AlterTableContext ctx) { + listeners.remove(columnDefinitionListener); + super.exitAlterTable(ctx); + this.tableIdentifier = null; + } + + @Override + public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + if (ctx.FIRST() != null) { + changes.add( + AlterTableAddColumnEvent.addFirst( + tableIdentifier, toSeatunnelColumn(column))); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + changes.add( + AlterTableAddColumnEvent.addAfter( + tableIdentifier, toSeatunnelColumn(column), afterColumn)); + } else { + changes.add( + AlterTableAddColumnEvent.add( + tableIdentifier, toSeatunnelColumn(column))); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + parser.runIfNotNull( + () -> { + if (columnEditors != null) { + // column editor list is not null when a multiple columns are parsed in one + // statement + if (columnEditors.size() > parsingColumnIndex) { + // assign next column editor to parse another column definition + columnDefinitionListener.setColumnEditor( + columnEditors.get(parsingColumnIndex++)); + } + } + }, + columnEditors); + super.exitColumnDefinition(ctx); + } + + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + if (ctx.FIRST() != null) { + changes.add( + AlterTableModifyColumnEvent.addFirst( + tableIdentifier, toSeatunnelColumn(column))); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + changes.add( + AlterTableModifyColumnEvent.addAfter( + tableIdentifier, toSeatunnelColumn(column), afterColumn)); + } else { + changes.add( + AlterTableModifyColumnEvent.add( + tableIdentifier, toSeatunnelColumn(column))); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + + @Override + public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.oldColumn); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByChangeColumn(ctx); + } + + @Override + public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + String oldColumnName = column.name(); + String newColumnName = parser.parseName(ctx.newColumn); + Column newColumn = column.edit().name(newColumnName).create(); + if (StringUtils.isNotBlank(newColumnName) + && !StringUtils.equals(oldColumnName, newColumnName)) { + changes.add( + AlterTableChangeColumnEvent.change( + tableIdentifier, + oldColumnName, + toSeatunnelColumn(newColumn))); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByChangeColumn(ctx); + } + + @Override + public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { + String removedColName = parser.parseName(ctx.uid()); + changes.add(new AlterTableDropColumnEvent(tableIdentifier, removedColName)); + super.enterAlterByDropColumn(ctx); + } + + private org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumn(Column column) { + return MySqlTypeUtils.convertToSeaTunnelColumn(column, dbzConnectorConfig); + } + + private TableIdentifier toTableIdentifier(TableId tableId) { + return new TableIdentifier("", tableId.catalog(), tableId.schema(), tableId.table()); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java new file mode 100644 index 000000000000..0bfc4721aa44 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.antlr.v4.runtime.tree.ParseTreeListener; + +import io.debezium.antlr.AntlrDdlParser; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.ddl.DataType; +import io.debezium.util.Strings; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Types; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** Parser listener that is parsing column definition part of MySQL statements. */ +@Slf4j +public class CustomColumnDefinitionParserListener extends MySqlParserBaseListener { + + private static final Pattern DOT = Pattern.compile("\\."); + private final MySqlAntlrDdlParser parser; + private final DataTypeResolver dataTypeResolver; + private ColumnEditor columnEditor; + private boolean uniqueColumn; + private AtomicReference optionalColumn = new AtomicReference<>(); + private DefaultValueParserListener defaultValueListener; + + private final List listeners; + + public CustomColumnDefinitionParserListener( + ColumnEditor columnEditor, + MySqlAntlrDdlParser parser, + List listeners) { + this.columnEditor = columnEditor; + this.parser = parser; + this.dataTypeResolver = parser.dataTypeResolver(); + this.listeners = listeners; + } + + public void setColumnEditor(ColumnEditor columnEditor) { + this.columnEditor = columnEditor; + } + + public ColumnEditor getColumnEditor() { + return columnEditor; + } + + public Column getColumn() { + return columnEditor.create(); + } + + @Override + public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + uniqueColumn = false; + optionalColumn = new AtomicReference<>(); + resolveColumnDataType(ctx.dataType()); + defaultValueListener = new DefaultValueParserListener(columnEditor, optionalColumn); + listeners.add(defaultValueListener); + super.enterColumnDefinition(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + if (optionalColumn.get() != null) { + columnEditor.optional(optionalColumn.get().booleanValue()); + } + defaultValueListener.exitDefaultValue(false); + listeners.remove(defaultValueListener); + super.exitColumnDefinition(ctx); + } + + @Override + public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext ctx) { + uniqueColumn = true; + super.enterUniqueKeyColumnConstraint(ctx); + } + + @Override + public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) { + // this rule will be parsed only if no primary key is set in a table + // otherwise the statement can't be executed due to multiple primary key error + optionalColumn.set(Boolean.FALSE); + super.enterPrimaryKeyColumnConstraint(ctx); + } + + @Override + public void enterCommentColumnConstraint(MySqlParser.CommentColumnConstraintContext ctx) { + if (!parser.skipComments()) { + if (ctx.STRING_LITERAL() != null) { + columnEditor.comment(parser.withoutQuotes(ctx.STRING_LITERAL().getText())); + } + } + super.enterCommentColumnConstraint(ctx); + } + + @Override + public void enterNullNotnull(MySqlParser.NullNotnullContext ctx) { + optionalColumn.set(Boolean.valueOf(ctx.NOT() == null)); + super.enterNullNotnull(ctx); + } + + @Override + public void enterAutoIncrementColumnConstraint( + MySqlParser.AutoIncrementColumnConstraintContext ctx) { + columnEditor.autoIncremented(true); + columnEditor.generated(true); + super.enterAutoIncrementColumnConstraint(ctx); + } + + @Override + public void enterSerialDefaultColumnConstraint( + MySqlParser.SerialDefaultColumnConstraintContext ctx) { + serialColumn(); + super.enterSerialDefaultColumnConstraint(ctx); + } + + private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) { + String charsetName = null; + DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext); + + if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) { + // Same as LongVarcharDataTypeContext but with dimension handling + MySqlParser.StringDataTypeContext stringDataTypeContext = + (MySqlParser.StringDataTypeContext) dataTypeContext; + + if (stringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + stringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + + charsetName = + parser.extractCharset( + stringDataTypeContext.charsetName(), + stringDataTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) { + // Same as StringDataTypeContext but without dimension handling + MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext = + (MySqlParser.LongVarcharDataTypeContext) dataTypeContext; + + charsetName = + parser.extractCharset( + longVarcharTypeContext.charsetName(), + longVarcharTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) { + MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext = + (MySqlParser.NationalStringDataTypeContext) dataTypeContext; + + if (nationalStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) { + MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext = + (MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext; + + if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalVaryingStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) { + MySqlParser.DimensionDataTypeContext dimensionDataTypeContext = + (MySqlParser.DimensionDataTypeContext) dataTypeContext; + + Integer length = null; + Integer scale = null; + if (dimensionDataTypeContext.lengthOneDimension() != null) { + length = + parseLength( + dimensionDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + } + + if (dimensionDataTypeContext.lengthTwoDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoDimension().decimalLiteral(); + length = parseLength(decimalLiterals.get(0).getText()); + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + + if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral(); + if (decimalLiterals.get(0).REAL_LITERAL() != null) { + String[] digits = DOT.split(decimalLiterals.get(0).getText()); + if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) { + // Set default value 10 according mysql engine + length = 10; + } else { + length = parseLength(digits[0]); + } + } else { + length = parseLength(decimalLiterals.get(0).getText()); + } + + if (decimalLiterals.size() > 1) { + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + } + if (length != null) { + columnEditor.length(length); + } + if (scale != null) { + columnEditor.scale(scale); + } + } else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) { + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + if (collectionDataTypeContext.charsetName() != null) { + charsetName = collectionDataTypeContext.charsetName().getText(); + } + + if (dataType.name().equalsIgnoreCase("SET")) { + // After DBZ-132, it will always be comma separated + int optionsSize = + collectionDataTypeContext.collectionOptions().collectionOption().size(); + columnEditor.length( + Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas + } else { + columnEditor.length(1); + } + } + + String dataTypeName = dataType.name().toUpperCase(); + + if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) { + // type expression has to be set, because the value converter needs to know the enum or + // set options + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + + List collectionOptions = + collectionDataTypeContext.collectionOptions().collectionOption().stream() + .map(AntlrDdlParser::getText) + .collect(Collectors.toList()); + + columnEditor.type(dataTypeName); + columnEditor.enumValues(collectionOptions); + } else if (dataTypeName.equals("SERIAL")) { + // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE + columnEditor.type("BIGINT UNSIGNED"); + serialColumn(); + } else { + columnEditor.type(dataTypeName); + } + + int jdbcDataType = dataType.jdbcType(); + columnEditor.jdbcType(jdbcDataType); + + if (columnEditor.length() == -1) { + columnEditor.length((int) dataType.length()); + } + if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) { + columnEditor.scale(dataType.scale()); + } + if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) { + // NCHAR and NVARCHAR columns always uses utf8 as charset + columnEditor.charsetName("utf8"); + } else { + columnEditor.charsetName(charsetName); + } + } + + private Integer parseLength(String lengthStr) { + Long length = Long.parseLong(lengthStr); + if (length > Integer.MAX_VALUE) { + log.warn( + "The length '{}' of the column `{}` is too large to be supported, truncating it to '{}'", + length, + columnEditor.name(), + Integer.MAX_VALUE); + length = (long) Integer.MAX_VALUE; + } + return length.intValue(); + } + + private void serialColumn() { + if (optionalColumn.get() == null) { + optionalColumn.set(Boolean.FALSE); + } + uniqueColumn = true; + columnEditor.autoIncremented(true); + columnEditor.generated(true); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java new file mode 100644 index 000000000000..a12b20f88977 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; + +import com.google.common.collect.Lists; +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; + +import java.sql.Types; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** A ddl parser that will use custom listener. */ +public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { + + private final LinkedList parsedEvents; + + private RelationalDatabaseConnectorConfig dbzConnectorConfig; + + private final TablePath tablePath; + + public CustomMySqlAntlrDdlParser( + TablePath tablePath, RelationalDatabaseConnectorConfig dbzConnectorConfig) { + super(); + this.tablePath = tablePath; + this.parsedEvents = new LinkedList<>(); + this.dbzConnectorConfig = dbzConnectorConfig; + } + + @Override + public TableId parseQualifiedTableId(MySqlParser.FullIdContext fullIdContext) { + return new TableId( + tablePath.getDatabaseName(), tablePath.getSchemaName(), tablePath.getTableName()); + } + + // Overriding this method because the BIT type requires default length dimension of 1. + // Remove it when debezium fixed this issue. + @Override + protected DataTypeResolver initializeDataTypeResolver() { + DataTypeResolver dataTypeResolver1 = super.initializeDataTypeResolver(); + DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder(); + + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.StringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT), + new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR), + new DataTypeResolver.DataTypeEntry( + Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHAR, + MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHARACTER, + MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.DimensionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT) + .setDefaultLengthDimension(1), + new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME), + new DataTypeResolver.DataTypeEntry( + Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP), + new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME), + new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SimpleDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.CollectionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SpatialDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.GEOMETRYCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING), + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.MULTILINESTRING), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARBINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARCHAR))); + + return dataTypeResolverBuilder.build(); + } + + @Override + protected AntlrDdlParserListener createParseTreeWalkerListener() { + return new CustomMySqlAntlrDdlParserListener(dbzConnectorConfig, this, parsedEvents); + } + + public List getAndClearParsedEvents() { + List result = Lists.newArrayList(parsedEvents); + parsedEvents.clear(); + return result; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java new file mode 100644 index 000000000000..4ea5ec705604 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; + +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.ErrorNode; +import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.antlr.v4.runtime.tree.TerminalNode; + +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.ProxyParseTreeListenerUtil; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.text.ParsingException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** This listener's constructor will use some modified listener. */ +public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener + implements AntlrDdlParserListener { + + /** Collection of listeners for delegation of events. */ + private final List listeners = new CopyOnWriteArrayList<>(); + + /** Flag for skipping phase. */ + private boolean skipNodes; + + /** + * Count of skipped nodes. Each enter event during skipping phase will increase the counter and + * each exit event will decrease it. When counter will be decreased to 0, the skipping phase + * will end. + */ + private int skippedNodesCount = 0; + + /** Collection of catched exceptions. */ + private final Collection errors = new ArrayList<>(); + + public CustomMySqlAntlrDdlParserListener( + RelationalDatabaseConnectorConfig dbzConnectorConfig, + MySqlAntlrDdlParser parser, + LinkedList parsedEvents) { + // Currently only DDL statements that modify the table structure are supported, so add + // custom listeners to handle these events. + listeners.add( + new CustomAlterTableParserListener( + dbzConnectorConfig, parser, listeners, parsedEvents)); + } + + /** + * Returns all caught errors during tree walk. + * + * @return list of Parsing exceptions + */ + @Override + public Collection getErrors() { + return errors; + } + + @Override + public void enterEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + skippedNodesCount++; + } else { + ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors); + } + } + + @Override + public void exitEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + if (skippedNodesCount == 0) { + // back in the node where skipping started + skipNodes = false; + } else { + // going up in a tree, means decreasing a number of skipped nodes + skippedNodesCount--; + } + } else { + ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors); + } + } + + @Override + public void visitErrorNode(ErrorNode node) { + ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors); + } + + @Override + public void visitTerminal(TerminalNode node) { + ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors); + } + + @Override + public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) { + // this is a grammar rule for BEGIN ... END part of statements. Skip it. + skipNodes = true; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java index 231501ea5cdf..1f7953a9fc16 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java @@ -55,7 +55,7 @@ public class UniqueDatabase { private final MySqlContainer container; private final String databaseName; - private final String templateName; + private String templateName; private final String username; private final String password; @@ -123,6 +123,11 @@ public String getPassword() { return password; } + public UniqueDatabase setTemplateName(String templateName) { + this.templateName = templateName; + return this; + } + /** @return Fully qualified table name <databaseName>.<tableName> */ public String qualifiedTableName(final String tableName) { return String.format("%s.%s", databaseName, tableName); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java index 06b4d77184d2..2c2cac27b4dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java @@ -31,7 +31,10 @@ public enum JdbcConnectorErrorCode implements SeaTunnelErrorCode { DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"), KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed"), NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support operation."), - DATA_TYPE_CAST_FAILED("JDBC-10", "Data type cast failed"); + DATA_TYPE_CAST_FAILED("JDBC-10", "Data type cast failed"), + REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT( + "JDBC-11", "Refresh the table with schema change failed"); + private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java index 3a6747aa89fb..1f35b3d2b9ed 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java @@ -70,11 +70,37 @@ private static void setProperties( Optional method = findSetterMethod(commonDataSource.getClass().getMethods(), entry.getKey()); if (method.isPresent()) { - method.get().invoke(commonDataSource, entry.getValue()); + Method setterMethod = method.get(); + Class parameterType = setterMethod.getParameterTypes()[0]; + Object value = entry.getValue(); + if (!parameterType.isInstance(value)) { + value = convertType(value, parameterType); + } + method.get().invoke(commonDataSource, value); } } } + private static Object convertType(Object value, Class targetType) { + if (targetType.isInstance(value)) { + return value; + } + if (targetType == Integer.class || targetType == int.class) { + return Integer.parseInt(value.toString()); + } else if (targetType == Long.class || targetType == long.class) { + return Long.parseLong(value.toString()); + } else if (targetType == Boolean.class || targetType == boolean.class) { + return Boolean.parseBoolean(value.toString()); + } else if (targetType == Double.class || targetType == double.class) { + return Double.parseDouble(value.toString()); + } else if (targetType == Float.class || targetType == float.class) { + return Float.parseFloat(value.toString()); + } else if (targetType == String.class) { + return value.toString(); + } + throw new IllegalArgumentException("Unsupported parameter type: " + targetType); + } + private static Method findGetterMethod(final DataSource dataSource, final String propertyName) throws NoSuchMethodException { String getterMethodName = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index da92f8210923..1166987f1c07 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider; @@ -423,4 +424,16 @@ default JdbcConnectionProvider getJdbcConnectionProvider( default String convertType(String columnName, String columnType) { return columnName; } + + /** + * Refresh physical table schema by schema change event + * + * @param event + * @param jdbcConnectionProvider + * @param sinkTablePath + */ + default void refreshPhysicalTableSchemaBySchemaChangeEvent( + SchemaChangeEvent event, + JdbcConnectionProvider jdbcConnectionProvider, + TablePath sinkTablePath) {}; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 03067f6d5e3e..0cb9cdc88dba 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -18,6 +18,10 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; @@ -25,6 +29,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; import org.apache.commons.lang3.StringUtils; @@ -214,4 +219,23 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta return SQLUtils.countForSubquery(connection, table.getQuery()); } + + @Override + public void refreshPhysicalTableSchemaBySchemaChangeEvent( + SchemaChangeEvent event, + JdbcConnectionProvider jdbcConnectionProvider, + TablePath sinkTablePath) { + try { + Connection connection = jdbcConnectionProvider.getOrEstablishConnection(); + Statement stmt = connection.createStatement(); + String alterTableSql = JdbcUtils.generateAlterTableSql(event, sinkTablePath); + log.info("Apply schema change with sql: {}", alterTableSql); + stmt.execute(alterTableSql); + } catch (Exception e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.CONNECT_DATABASE_FAILED, + "unable to open JDBC writer", + e); + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSink.java new file mode 100644 index 000000000000..2e46391fbf66 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSink.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; + +import java.io.IOException; +import java.util.List; + +public abstract class AbstractJdbcSink implements SinkWriter { + + protected JdbcDialect dialect; + protected TablePath sinkTablePath; + protected TableSchema tableSchema; + protected transient boolean isOpen; + protected JdbcConnectionProvider connectionProvider; + protected JdbcSinkConfig jdbcSinkConfig; + protected JdbcOutputFormat> outputFormat; + + @Override + public void applySchemaChange(SchemaChangeEvent event) throws IOException { + if (event instanceof AlterTableColumnsEvent) { + List events = ((AlterTableColumnsEvent) event).getEvents(); + for (AlterTableColumnEvent alterTableColumnEvent : events) { + processSchemaChangeEvent(alterTableColumnEvent); + } + } else { + this.processSchemaChangeEvent(event); + } + } + + protected void processSchemaChangeEvent(SchemaChangeEvent event) throws IOException { + // apply columns change + TableSchema newTableSchema = this.tableSchema.copy(); + List columns = newTableSchema.getColumns(); + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + Column addColumn = ((AlterTableAddColumnEvent) event).getColumn(); + columns.add(addColumn); + break; + case SCHEMA_CHANGE_DROP_COLUMN: + String dropColumn = ((AlterTableDropColumnEvent) event).getColumn(); + columns.removeIf(column -> column.getName().equalsIgnoreCase(dropColumn)); + break; + case SCHEMA_CHANGE_MODIFY_COLUMN: + Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn(); + replaceColumnByIndex(columns, modifyColumn.getName(), modifyColumn); + break; + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + (AlterTableChangeColumnEvent) event; + Column changeColumn = alterTableChangeColumnEvent.getColumn(); + String oldColumnName = alterTableChangeColumnEvent.getOldColumn(); + replaceColumnByIndex(columns, oldColumnName, changeColumn); + break; + default: + throw new SeaTunnelException( + "Unsupported schemaChangeEvent for event type: " + event.getEventType()); + } + // refresh table schema in seatunnel + this.tableSchema = newTableSchema; + reOpenOutputFormat(event); + } + + protected void reOpenOutputFormat(SchemaChangeEvent event) throws IOException { + JdbcOutputFormat> oldOutputFormat = + this.outputFormat; + try { + flushCommit(); + try { + dialect.refreshPhysicalTableSchemaBySchemaChangeEvent( + event, connectionProvider, sinkTablePath); + } catch (Throwable e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, + e); + } + isOpen = false; + this.outputFormat = + new JdbcOutputFormatBuilder( + dialect, connectionProvider, jdbcSinkConfig, tableSchema) + .build(); + } finally { + oldOutputFormat.close(); + } + } + + protected abstract void flushCommit() throws IOException; + + protected void replaceColumnByIndex( + List columns, String oldColumnName, Column newColumn) { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equalsIgnoreCase(oldColumnName)) { + columns.set(i, newColumn); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java index 77e673ae6b28..2490638656a5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -20,16 +20,15 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl; @@ -54,9 +53,8 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; -public class JdbcExactlyOnceSinkWriter - implements SinkWriter, - SupportMultiTableSinkWriter { +public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSink + implements SupportMultiTableSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class); private final SinkWriter.Context sinkcontext; @@ -71,15 +69,11 @@ public class JdbcExactlyOnceSinkWriter private final XidGenerator xidGenerator; - private final JdbcOutputFormat> - outputFormat; - - private transient boolean isOpen; - private transient Xid currentXid; private transient Xid prepareXid; public JdbcExactlyOnceSinkWriter( + TablePath sinkTablePath, SinkWriter.Context sinkcontext, JobContext context, JdbcDialect dialect, @@ -90,14 +84,18 @@ public JdbcExactlyOnceSinkWriter( jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0, "JDBC XA sink requires maxRetries equal to 0, otherwise it could " + "cause duplicates."); - + this.sinkTablePath = sinkTablePath; + this.dialect = dialect; + this.tableSchema = tableSchema; + this.jdbcSinkConfig = jdbcSinkConfig; this.context = context; this.sinkcontext = sinkcontext; this.recoverStates = states; this.xidGenerator = XidGenerator.semanticXidGenerator(); checkState(jdbcSinkConfig.isExactlyOnce(), "is_exactly_once config error"); - this.xaFacade = + this.connectionProvider = XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig()); + this.xaFacade = (XaFacade) this.connectionProvider; this.outputFormat = new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, tableSchema).build(); this.xaGroupOps = new XaGroupOpsImpl(xaFacade); @@ -160,6 +158,12 @@ public Optional prepareCommit() throws IOException { return emptyXaTransaction ? Optional.empty() : Optional.of(new XidInfo(prepareXid, 0)); } + @Override + protected void flushCommit() throws IOException { + this.prepareCurrentTx(); + this.currentXid = null; + } + @Override public void abortPrepare() {} @@ -180,10 +184,12 @@ public void close() throws IOException { CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC exactly one writer", e); + } finally { + outputFormat.close(); + xidGenerator.close(); + currentXid = null; + prepareXid = null; } - xidGenerator.close(); - currentXid = null; - prepareXid = null; } private void beginTx() throws IOException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 384319d0223c..8ed7c669acb8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -102,10 +102,12 @@ public String getPluginName() { @Override public SinkWriter createWriter( SinkWriter.Context context) { + TablePath sinkTablePath = catalogTable.getTablePath(); SinkWriter sinkWriter; if (jdbcSinkConfig.isExactlyOnce()) { sinkWriter = new JdbcExactlyOnceSinkWriter( + sinkTablePath, context, jobContext, dialect, @@ -117,10 +119,12 @@ public SinkWriter createWriter( String keyName = tableSchema.getPrimaryKey().getColumnNames().get(0); int index = tableSchema.toPhysicalRowDataType().indexOf(keyName); if (index > -1) { - return new JdbcSinkWriter(dialect, jdbcSinkConfig, tableSchema, index); + return new JdbcSinkWriter( + sinkTablePath, dialect, jdbcSinkConfig, tableSchema, index); } } - sinkWriter = new JdbcSinkWriter(dialect, jdbcSinkConfig, tableSchema, null); + sinkWriter = + new JdbcSinkWriter(sinkTablePath, dialect, jdbcSinkConfig, tableSchema, null); } return sinkWriter; } @@ -128,9 +132,16 @@ public SinkWriter createWriter( @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { + TablePath sinkTablePath = catalogTable.getTablePath(); if (jdbcSinkConfig.isExactlyOnce()) { return new JdbcExactlyOnceSinkWriter( - context, jobContext, dialect, jdbcSinkConfig, tableSchema, states); + sinkTablePath, + context, + jobContext, + dialect, + jdbcSinkConfig, + tableSchema, + states); } return SeaTunnelSink.super.restoreWriter(context, states); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index 1cf08c4c4323..c48b82ac7327 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -18,20 +18,17 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; import org.apache.seatunnel.api.sink.MultiTableResourceManager; -import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -45,25 +42,20 @@ import java.util.Optional; @Slf4j -public class JdbcSinkWriter - implements SinkWriter, - SupportMultiTableSinkWriter { - private JdbcOutputFormat> outputFormat; - private final JdbcDialect dialect; - private final TableSchema tableSchema; - private JdbcConnectionProvider connectionProvider; - private transient boolean isOpen; +public class JdbcSinkWriter extends AbstractJdbcSink + implements SupportMultiTableSinkWriter { private final Integer primaryKeyIndex; - private final JdbcSinkConfig jdbcSinkConfig; public JdbcSinkWriter( + TablePath sinkTablePath, JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, Integer primaryKeyIndex) { - this.jdbcSinkConfig = jdbcSinkConfig; + this.sinkTablePath = sinkTablePath; this.dialect = dialect; this.tableSchema = tableSchema; + this.jdbcSinkConfig = jdbcSinkConfig; this.primaryKeyIndex = primaryKeyIndex; this.connectionProvider = dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig()); @@ -129,6 +121,11 @@ public void write(SeaTunnelRow element) throws IOException { outputFormat.writeRecord(element); } + @Override + protected void flushCommit() throws IOException { + this.prepareCommit(); + } + @Override public Optional prepareCommit() throws IOException { tryOpen(); @@ -163,7 +160,8 @@ public void close() throws IOException { CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC sink write", e); + } finally { + outputFormat.close(); } - outputFormat.close(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java new file mode 100644 index 000000000000..3315d68bcdba --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.ConverterLoader; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import org.apache.commons.lang3.StringUtils; + +import com.mysql.cj.MysqlType; + +public class JdbcUtils { + + /** + * generate alter table sql + * + * @param event schema change event + * @return + */ + public static String generateAlterTableSql(SchemaChangeEvent event, TablePath sinkTablePath) { + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + Column addColumn = ((AlterTableAddColumnEvent) event).getColumn(); + return generateAlterTableSql( + AlterType.ADD.getTypeQuoteBySpace(), + addColumn, + sinkTablePath.getFullNameWithQuoted(), + StringUtils.EMPTY); + case SCHEMA_CHANGE_DROP_COLUMN: + String dropColumn = ((AlterTableDropColumnEvent) event).getColumn(); + return String.format( + "ALTER TABLE %s drop column %s", + sinkTablePath.getFullNameWithQuoted(), dropColumn); + case SCHEMA_CHANGE_MODIFY_COLUMN: + Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn(); + return generateAlterTableSql( + AlterType.MODIFY.getTypeQuoteBySpace(), + modifyColumn, + sinkTablePath.getFullNameWithQuoted(), + StringUtils.EMPTY); + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + (AlterTableChangeColumnEvent) event; + Column changeColumn = alterTableChangeColumnEvent.getColumn(); + String oldColumnName = alterTableChangeColumnEvent.getOldColumn(); + return generateAlterTableSql( + AlterType.CHANGE.getTypeQuoteBySpace(), + changeColumn, + sinkTablePath.getFullNameWithQuoted(), + oldColumnName); + default: + throw new SeaTunnelException( + "Unsupported schemaChangeEvent for event type: " + event.getEventType()); + } + } + + public static String generateAlterTableSql( + String alterOperation, Column newColumn, String tableName, String oldColumnName) { + StringBuilder sql = new StringBuilder("ALTER TABLE " + tableName + alterOperation); + TypeConverter typeConverter = + ConverterLoader.loadTypeConverter(DatabaseIdentifier.MYSQL); + BasicTypeDefine mysqlTypeBasicTypeDefine = + (BasicTypeDefine) typeConverter.reconvert(newColumn); + if (alterOperation.trim().equals(AlterType.CHANGE)) { + sql.append(oldColumnName) + .append(StringUtils.SPACE) + .append(newColumn.getName()) + .append(StringUtils.SPACE); + } else { + sql.append(newColumn.getName()).append(StringUtils.SPACE); + } + sql.append(mysqlTypeBasicTypeDefine.getColumnType()).append(StringUtils.SPACE); + if (mysqlTypeBasicTypeDefine.isNullable()) { + sql.append("NULL "); + } else { + sql.append("NOT NULL "); + } + String comment = mysqlTypeBasicTypeDefine.getComment(); + if (StringUtils.isNotBlank(comment)) { + sql.append("COMMENT '").append(comment).append("'"); + } + return sql + ";"; + } + + public enum AlterType { + ADD, + DROP, + MODIFY, + CHANGE; + + public String getTypeQuoteBySpace() { + return StringUtils.SPACE + this.name() + StringUtils.SPACE; + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java new file mode 100644 index 000000000000..4231a5a89c55 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") +public class MysqlCDCWithSchemaChangeIT extends TestSuiteBase implements TestResource { + private static final String MYSQL_DATABASE = "shop"; + private static final String SOURCE_TABLE = "products"; + private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table_with_schema_change"; + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase shopDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE); + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(MYSQL_DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private String driverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib && cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @TestTemplate + public void testMysqlCdcWithSchemaEvolutionCase(TestContainer container) { + CompletableFuture.runAsync( + () -> { + try { + container.executeJob("/mysqlcdc_to_mysql_with_schema_change.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> Assertions.assertIterableEquals( + query(getQuerySql(MYSQL_DATABASE,SOURCE_TABLE)), + query(getQuerySql(MYSQL_DATABASE,SINK_TABLE)))); + + // case1 add columns with cdc data at same time + shopDatabase.setTemplateName("add_columns").createAndInitialize(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> Assertions.assertIterableEquals( + query(getQuerySql(MYSQL_DATABASE,SOURCE_TABLE)), + query(getQuerySql(MYSQL_DATABASE,SINK_TABLE)))); + + } + + private String getQuerySql(String db, String tb) { + return "select * from " + db + "." + tb; + } + + private void alterSourceTableDropColumns(String database, String tableName) { + executeSql("ALTER TABLE " + database + "." + tableName + " drop COLUMN add_column4"); + executeSql("update " + database + "." + tableName + " set add_column2=10 where id=112"); + } + + private void alterSourceTableModifyDataTypeOfColumn(String database, String tableName) { + executeSql("ALTER TABLE " + database + "." + tableName + " modify weight double not null"); + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " VALUES (default,'productNameOne','descriptions',3.1415,'test',12,'test')"); + } + + private void alterSourceTableRenameColumnName(String database, String tableName) { + executeSql( + "ALTER TABLE " + + database + + "." + + tableName + + " change name name1 VARCHAR(255) DEFAULT 'SeaTunnel' NOT NULL"); + executeSql("delete from " + database + "." + tableName + " WHERE id = 111"); + } + + private void executeSql(String sql) { + try (Connection connection = getJdbcConnection()) { + connection.createStatement().execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeAll + @Override + public void startUp() { + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + shopDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + } + + @AfterAll + @Override + public void tearDown() { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + } + + private List> query(String sql) { + try (Connection connection = getJdbcConnection()) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql new file mode 100644 index 000000000000..871749f7851d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql @@ -0,0 +1,46 @@ +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; +INSERT INTO products +VALUES (default,"scooter","Small 2-wheel scooter",3.14), + (default,"car battery","12V car battery",8.1), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (default,"hammer","12oz carpenter's hammer",0.75), + (default,"hammer","14oz carpenter's hammer",0.875), + (default,"hammer","16oz carpenter's hammer",1.0), + (default,"rocks","box of assorted rocks",5.3), + (default,"jacket","water resistent black wind breaker",0.1), + (default,"spare tire","24 inch spare tire",22.2); +update products set name = 'dailai' where id = 101; +delete from products where id = 102; + +alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1; + +update products set name = 'dailai' where id = 110; +insert into products +values (default,"scooter","Small 2-wheel scooter",3.14,'xx',1), + (default,"car battery","12V car battery",8.1,'xx',2), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3), + (default,"hammer","12oz carpenter's hammer",0.75,'xx',4), + (default,"hammer","14oz carpenter's hammer",0.875,'xx',5), + (default,"hammer","16oz carpenter's hammer",1.0,'xx',6), + (default,"rocks","box of assorted rocks",5.3,'xx',7), + (default,"jacket","water resistent black wind breaker",0.1,'xx',8), + (default,"spare tire","24 inch spare tire",22.2,'xx',9); +delete from products where id = 118; + +alter table products ADD COLUMN add_column3 float not null default 1.1; +alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(); + +delete from products where id = 113; +insert into products +values (default,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (default,"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (default,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (default,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (default,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (default,"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (default,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (default,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); +update products set name = 'test' where id = 121; + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql index 9e9fff3f8f4c..a0a225981f33 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql @@ -18,7 +18,6 @@ -- ---------------------------------------------------------------------------------------------------------------- -- DATABASE: inventory -- ---------------------------------------------------------------------------------------------------------------- - -- Create and populate our products using a single insert with many rows CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -92,4 +91,4 @@ VALUES (default, '2016-01-16', 1001, 1, 102), CREATE TABLE category ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, category_name VARCHAR(255) -); \ No newline at end of file +); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql new file mode 100644 index 000000000000..3d73d442c79b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql @@ -0,0 +1,48 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products AUTO_INCREMENT = 101; + +CREATE TABLE mysql_cdc_e2e_sink_table_with_schema_change ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +INSERT INTO products +VALUES (default,"scooter","Small 2-wheel scooter",3.14), + (default,"car battery","12V car battery",8.1), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (default,"hammer","12oz carpenter's hammer",0.75), + (default,"hammer","14oz carpenter's hammer",0.875), + (default,"hammer","16oz carpenter's hammer",1.0), + (default,"rocks","box of assorted rocks",5.3), + (default,"jacket","water resistent black wind breaker",0.1), + (default,"spare tire","24 inch spare tire",22.2); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf new file mode 100644 index 000000000000..359ca1b9ef01 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + jdbc { + url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + generate_sink_sql = true + database = shop + table = mysql_cdc_e2e_sink_table_with_schema_change + primary_keys = ["id"] + } +}