From 32d19be6a8ad50b9dad27c8c70e18b5800bdd20e Mon Sep 17 00:00:00 2001 From: dailai Date: Fri, 5 Jul 2024 09:53:09 +0800 Subject: [PATCH] [Improve][Connector-V2] Support schema evolution for mysql-cdc and mysql-jdbc (#6929) try to fix milvus e2e --- .../seatunnel/api/table/catalog/Column.java | 3 + .../api/table/catalog/MetadataColumn.java | 5 + .../api/table/catalog/PhysicalColumn.java | 18 + .../table/event/AlterTableColumnEvent.java | 1 + .../seatunnel/api/table/event/TableEvent.java | 1 + .../schema/AbstractSchemaChangeResolver.java | 58 ++++ .../cdc/base/utils/SourceRecordUtils.java | 6 + ...SeaTunnelRowDebeziumDeserializeSchema.java | 3 +- .../config/MySqlSourceConfigFactory.java | 3 +- .../mysql/source/MySqlIncrementalSource.java | 2 + .../source/MySqlSchemaChangeResolver.java | 79 +++++ .../CustomAlterTableParserListener.java | 246 +++++++++++++ .../CustomColumnDefinitionParserListener.java | 325 ++++++++++++++++++ .../CustomDefaultValueParserListener.java | 48 +++ .../parser/CustomMySqlAntlrDdlParser.java | 304 ++++++++++++++++ .../CustomMySqlAntlrDdlParserListener.java | 121 +++++++ .../cdc/mysql/utils/MySqlTypeUtils.java | 20 +- .../cdc/mysql/testutils/UniqueDatabase.java | 7 +- .../exception/JdbcConnectorErrorCode.java | 5 +- .../internal/connection/DataSourceUtils.java | 28 +- .../jdbc/internal/dialect/JdbcDialect.java | 279 +++++++++++++++ .../internal/dialect/mysql/MysqlDialect.java | 71 ++++ .../jdbc/sink/AbstractJdbcSinkWriter.java | 140 ++++++++ .../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 30 +- .../seatunnel/jdbc/sink/JdbcSink.java | 17 +- .../seatunnel/jdbc/sink/JdbcSinkWriter.java | 23 +- .../jdbc/utils/MysqlDefaultValueUtils.java | 33 ++ .../cdc/mysql/MysqlCDCWithSchemaChangeIT.java | 282 +++++++++++++++ .../src/test/resources/ddl/add_columns.sql | 69 ++++ .../src/test/resources/ddl/change_columns.sql | 36 ++ .../src/test/resources/ddl/drop_columns.sql | 50 +++ .../src/test/resources/ddl/inventory.sql | 3 +- .../src/test/resources/ddl/modify_columns.sql | 36 ++ .../src/test/resources/ddl/shop.sql | 78 +++++ .../mysqlcdc_to_mysql_with_schema_change.conf | 54 +++ ...mysql_with_schema_change_exactly_once.conf | 56 +++ .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 22 ++ .../e2e/connector/v2/milvus/MilvusIT.java | 3 +- 38 files changed, 2519 insertions(+), 46 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomDefaultValueParserListener.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/MysqlDefaultValueUtils.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/change_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/modify_columns.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java index 461259f2fd68..d7e236d30936 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java @@ -191,4 +191,7 @@ protected Column( /** Returns a copy of the column with a replaced name. */ public abstract Column rename(String newColumnName); + + /** Returns a copy of the column with a replaced sourceType. */ + public abstract Column reSourceType(String sourceType); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java index 47c17d0fcb18..e0e7e9e99107 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java @@ -83,4 +83,9 @@ public Column rename(String newColumnName) { defaultValue, comment); } + + @Override + public Column reSourceType(String sourceType) { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java index 16cae45ae410..db9da1b2b756 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java @@ -325,4 +325,22 @@ public Column rename(String newColumnName) { bitLen, longColumnLength); } + + @Override + public Column reSourceType(String newSourceType) { + return new PhysicalColumn( + name, + dataType, + columnLength, + scale, + nullable, + defaultValue, + comment, + newSourceType, + options, + isUnsigned, + isZeroFill, + bitLen, + longColumnLength); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java index 54e27094f198..97076560f02c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java @@ -23,6 +23,7 @@ @ToString(callSuper = true) public abstract class AlterTableColumnEvent extends AlterTableEvent { + public AlterTableColumnEvent(TableIdentifier tableIdentifier) { super(tableIdentifier); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java index 4a1235bb6716..af08377a9cb2 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java @@ -33,6 +33,7 @@ public abstract class TableEvent implements SchemaChangeEvent { protected final TableIdentifier tableIdentifier; @Getter @Setter private String jobId; @Getter @Setter private String statement; + @Getter @Setter protected String sourceDialectName; @Override public TableIdentifier tableIdentifier() { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java new file mode 100644 index 000000000000..ac86dd0d2bc4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.schema; + +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import com.google.common.collect.Lists; +import io.debezium.relational.history.HistoryRecord; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver { + + protected static final List SUPPORT_DDL = Lists.newArrayList("ALTER TABLE"); + + protected JdbcSourceConfig jdbcSourceConfig; + + public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) { + this.jdbcSourceConfig = jdbcSourceConfig; + } + + @Override + public boolean support(SourceRecord record) { + String ddl = SourceRecordUtils.getDdl(record); + Struct value = (Struct) record.value(); + List tableChanges = value.getArray(HistoryRecord.Fields.TABLE_CHANGES); + if (tableChanges == null || tableChanges.isEmpty()) { + log.warn("Ignoring statement for non-captured table {}", ddl); + return false; + } + return StringUtils.isNotBlank(ddl) + && SUPPORT_DDL.stream() + .map(String::toUpperCase) + .anyMatch(prefix -> ddl.toUpperCase().contains(prefix)); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index e06213b06d5e..3245273ace2b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -28,6 +28,7 @@ import io.debezium.data.Envelope; import io.debezium.document.DocumentReader; import io.debezium.relational.TableId; +import io.debezium.relational.history.HistoryRecord; import io.debezium.util.SchemaNameAdjuster; import java.math.BigDecimal; @@ -214,4 +215,9 @@ public static TablePath getTablePath(SourceRecord record) { } return TablePath.of(databaseName, schemaName, tableName); } + + public static String getDdl(SourceRecord record) { + Struct schemaChangeStruct = (Struct) record.value(); + return schemaChangeStruct.getString(HistoryRecord.Fields.DDL_STATEMENTS); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index d249b0d92763..1fd706290185 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -119,10 +119,9 @@ private void deserializeSchemaChangeRecord( SourceRecord record, Collector collector) { SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo); if (schemaChangeEvent == null) { - log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record); + log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", record); return; } - if (resultTypeInfo instanceof MultipleRowType) { Map newRowTypeMap = new HashMap<>(); for (Map.Entry entry : (MultipleRowType) resultTypeInfo) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index 7168da8a8465..b39655293e46 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -76,7 +76,8 @@ public MySqlSourceConfig create(int subtaskId) { // Note: the includeSchemaChanges parameter is used to control emitting the schema record, // only DataStream API program need to emit the schema record, the Table API need not - // TODO Not yet supported + // Some scenarios do not require automatic capture of table structure changes, so the + // default setting is false. props.setProperty("include.schema.changes", String.valueOf(false)); // disable the offset flush totally props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java index 434ce75cdd16..1d2d9f9a033e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java @@ -102,6 +102,8 @@ public DebeziumDeserializationSchema createDebeziumDeserializationSchema( .setPhysicalRowType(physicalRowType) .setResultTypeInfo(physicalRowType) .setServerTimeZone(ZoneId.of(zoneId)) + .setSchemaChangeResolver( + new MySqlSchemaChangeResolver(createSourceConfigFactory(config))) .build(); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java new file mode 100644 index 000000000000..bd386e9bb597 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source; + +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; +import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; +import org.apache.seatunnel.connectors.cdc.base.schema.AbstractSchemaChangeResolver; +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser.CustomMySqlAntlrDdlParser; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.source.SourceRecord; + +import io.debezium.relational.Tables; + +import java.util.List; +import java.util.Objects; + +public class MySqlSchemaChangeResolver extends AbstractSchemaChangeResolver { + private transient Tables tables; + private transient CustomMySqlAntlrDdlParser customMySqlAntlrDdlParser; + + public MySqlSchemaChangeResolver(SourceConfig.Factory sourceConfigFactory) { + super(sourceConfigFactory.create(0)); + } + + @Override + public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) { + TablePath tablePath = SourceRecordUtils.getTablePath(record); + String ddl = SourceRecordUtils.getDdl(record); + if (Objects.isNull(customMySqlAntlrDdlParser)) { + this.customMySqlAntlrDdlParser = + new CustomMySqlAntlrDdlParser( + tablePath, this.jdbcSourceConfig.getDbzConnectorConfig()); + } + if (Objects.isNull(tables)) { + this.tables = new Tables(); + } + customMySqlAntlrDdlParser.setCurrentDatabase(tablePath.getDatabaseName()); + customMySqlAntlrDdlParser.setCurrentSchema(tablePath.getSchemaName()); + // Parse DDL statement using Debezium's Antlr parser + customMySqlAntlrDdlParser.parse(ddl, tables); + List parsedEvents = + customMySqlAntlrDdlParser.getAndClearParsedEvents(); + AlterTableColumnsEvent alterTableColumnsEvent = + new AlterTableColumnsEvent( + TableIdentifier.of( + StringUtils.EMPTY, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()), + parsedEvents); + alterTableColumnsEvent.setStatement(ddl); + alterTableColumnsEvent.setSourceDialectName(DatabaseIdentifier.MYSQL); + return parsedEvents.isEmpty() ? null : alterTableColumnsEvent; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java new file mode 100644 index 000000000000..bf36d7831ee2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomAlterTableParserListener.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlTypeUtils; + +import org.apache.commons.lang3.StringUtils; + +import org.antlr.v4.runtime.tree.ParseTreeListener; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; + +import java.util.LinkedList; +import java.util.List; + +public class CustomAlterTableParserListener extends MySqlParserBaseListener { + private static final int STARTING_INDEX = 1; + private final MySqlAntlrDdlParser parser; + private final List listeners; + private final LinkedList changes; + private List columnEditors; + private TableIdentifier tableIdentifier; + + private CustomColumnDefinitionParserListener columnDefinitionListener; + + private int parsingColumnIndex = STARTING_INDEX; + + private RelationalDatabaseConnectorConfig dbzConnectorConfig; + + public CustomAlterTableParserListener( + RelationalDatabaseConnectorConfig dbzConnectorConfig, + MySqlAntlrDdlParser parser, + List listeners, + LinkedList changes) { + this.dbzConnectorConfig = dbzConnectorConfig; + this.parser = parser; + this.listeners = listeners; + this.changes = changes; + } + + @Override + public void enterAlterTable(MySqlParser.AlterTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + this.tableIdentifier = toTableIdentifier(tableId); + super.enterAlterTable(ctx); + } + + @Override + public void exitAlterTable(MySqlParser.AlterTableContext ctx) { + listeners.remove(columnDefinitionListener); + super.exitAlterTable(ctx); + this.tableIdentifier = null; + } + + @Override + public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + org.apache.seatunnel.api.table.catalog.Column seatunnelColumn = + toSeatunnelColumn(column); + String sourceColumnType = getSourceColumnType(column); + seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType); + if (ctx.FIRST() != null) { + AlterTableAddColumnEvent alterTableAddColumnEvent = + AlterTableAddColumnEvent.addFirst(tableIdentifier, seatunnelColumn); + changes.add(alterTableAddColumnEvent); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + AlterTableAddColumnEvent alterTableAddColumnEvent = + AlterTableAddColumnEvent.addAfter( + tableIdentifier, seatunnelColumn, afterColumn); + changes.add(alterTableAddColumnEvent); + } else { + AlterTableAddColumnEvent alterTableAddColumnEvent = + AlterTableAddColumnEvent.add(tableIdentifier, seatunnelColumn); + changes.add(alterTableAddColumnEvent); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + parser.runIfNotNull( + () -> { + if (columnEditors != null) { + // column editor list is not null when a multiple columns are parsed in one + // statement + if (columnEditors.size() > parsingColumnIndex) { + // assign next column editor to parse another column definition + columnDefinitionListener.setColumnEditor( + columnEditors.get(parsingColumnIndex++)); + } + } + }, + columnEditors); + super.exitColumnDefinition(ctx); + } + + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + org.apache.seatunnel.api.table.catalog.Column seatunnelColumn = + toSeatunnelColumn(column); + String sourceColumnType = getSourceColumnType(column); + seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType); + if (ctx.FIRST() != null) { + AlterTableModifyColumnEvent alterTableModifyColumnEvent = + AlterTableModifyColumnEvent.modifyFirst( + tableIdentifier, seatunnelColumn); + changes.add(alterTableModifyColumnEvent); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + AlterTableModifyColumnEvent alterTableModifyColumnEvent = + AlterTableModifyColumnEvent.modifyAfter( + tableIdentifier, seatunnelColumn, afterColumn); + changes.add(alterTableModifyColumnEvent); + } else { + AlterTableModifyColumnEvent alterTableModifyColumnEvent = + AlterTableModifyColumnEvent.modify( + tableIdentifier, seatunnelColumn); + changes.add(alterTableModifyColumnEvent); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + + @Override + public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.oldColumn); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByChangeColumn(ctx); + } + + @Override + public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + org.apache.seatunnel.api.table.catalog.Column seatunnelColumn = + toSeatunnelColumn(column); + String sourceColumnType = getSourceColumnType(column); + seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType); + String oldColumnName = column.name(); + String newColumnName = parser.parseName(ctx.newColumn); + seatunnelColumn = seatunnelColumn.rename(newColumnName); + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + AlterTableChangeColumnEvent.change( + tableIdentifier, oldColumnName, seatunnelColumn); + if (StringUtils.isNotBlank(newColumnName) + && !StringUtils.equals(oldColumnName, newColumnName)) { + changes.add(alterTableChangeColumnEvent); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByChangeColumn(ctx); + } + + @Override + public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { + String removedColName = parser.parseName(ctx.uid()); + changes.add(new AlterTableDropColumnEvent(tableIdentifier, removedColName)); + super.enterAlterByDropColumn(ctx); + } + + private org.apache.seatunnel.api.table.catalog.Column toSeatunnelColumn(Column column) { + return MySqlTypeUtils.convertToSeaTunnelColumn(column, dbzConnectorConfig); + } + + private TableIdentifier toTableIdentifier(TableId tableId) { + return new TableIdentifier("", tableId.catalog(), tableId.schema(), tableId.table()); + } + + private String getSourceColumnType(Column column) { + StringBuilder sb = new StringBuilder(column.typeName()); + if (column.length() >= 0) { + sb.append('(').append(column.length()); + if (column.scale().isPresent()) { + sb.append(", ").append(column.scale().get()); + } + + sb.append(')'); + } + return sb.toString(); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java new file mode 100644 index 000000000000..a04e042f5954 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.antlr.v4.runtime.tree.ParseTreeListener; + +import io.debezium.antlr.AntlrDdlParser; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.ddl.DataType; +import io.debezium.util.Strings; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Types; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** Parser listener that is parsing column definition part of MySQL statements. */ +@Slf4j +public class CustomColumnDefinitionParserListener extends MySqlParserBaseListener { + + private static final Pattern DOT = Pattern.compile("\\."); + private final MySqlAntlrDdlParser parser; + private final DataTypeResolver dataTypeResolver; + private ColumnEditor columnEditor; + private boolean uniqueColumn; + private AtomicReference optionalColumn = new AtomicReference<>(); + private DefaultValueParserListener defaultValueListener; + + private final List listeners; + + public CustomColumnDefinitionParserListener( + ColumnEditor columnEditor, + MySqlAntlrDdlParser parser, + List listeners) { + this.columnEditor = columnEditor; + this.parser = parser; + this.dataTypeResolver = parser.dataTypeResolver(); + this.listeners = listeners; + } + + public void setColumnEditor(ColumnEditor columnEditor) { + this.columnEditor = columnEditor; + } + + public ColumnEditor getColumnEditor() { + return columnEditor; + } + + public Column getColumn() { + return columnEditor.create(); + } + + @Override + public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + uniqueColumn = false; + optionalColumn = new AtomicReference<>(); + resolveColumnDataType(ctx.dataType()); + defaultValueListener = new CustomDefaultValueParserListener(columnEditor, optionalColumn); + listeners.add(defaultValueListener); + super.enterColumnDefinition(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + if (optionalColumn.get() != null) { + columnEditor.optional(optionalColumn.get().booleanValue()); + } + defaultValueListener.exitDefaultValue(false); + listeners.remove(defaultValueListener); + super.exitColumnDefinition(ctx); + } + + @Override + public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext ctx) { + uniqueColumn = true; + super.enterUniqueKeyColumnConstraint(ctx); + } + + @Override + public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) { + // this rule will be parsed only if no primary key is set in a table + // otherwise the statement can't be executed due to multiple primary key error + optionalColumn.set(Boolean.FALSE); + super.enterPrimaryKeyColumnConstraint(ctx); + } + + @Override + public void enterCommentColumnConstraint(MySqlParser.CommentColumnConstraintContext ctx) { + if (!parser.skipComments()) { + if (ctx.STRING_LITERAL() != null) { + columnEditor.comment(parser.withoutQuotes(ctx.STRING_LITERAL().getText())); + } + } + super.enterCommentColumnConstraint(ctx); + } + + @Override + public void enterNullNotnull(MySqlParser.NullNotnullContext ctx) { + optionalColumn.set(Boolean.valueOf(ctx.NOT() == null)); + super.enterNullNotnull(ctx); + } + + @Override + public void enterAutoIncrementColumnConstraint( + MySqlParser.AutoIncrementColumnConstraintContext ctx) { + columnEditor.autoIncremented(true); + columnEditor.generated(true); + super.enterAutoIncrementColumnConstraint(ctx); + } + + @Override + public void enterSerialDefaultColumnConstraint( + MySqlParser.SerialDefaultColumnConstraintContext ctx) { + serialColumn(); + super.enterSerialDefaultColumnConstraint(ctx); + } + + private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) { + String charsetName = null; + DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext); + + if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) { + // Same as LongVarcharDataTypeContext but with dimension handling + MySqlParser.StringDataTypeContext stringDataTypeContext = + (MySqlParser.StringDataTypeContext) dataTypeContext; + + if (stringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + stringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + + charsetName = + parser.extractCharset( + stringDataTypeContext.charsetName(), + stringDataTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) { + // Same as StringDataTypeContext but without dimension handling + MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext = + (MySqlParser.LongVarcharDataTypeContext) dataTypeContext; + + charsetName = + parser.extractCharset( + longVarcharTypeContext.charsetName(), + longVarcharTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) { + MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext = + (MySqlParser.NationalStringDataTypeContext) dataTypeContext; + + if (nationalStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) { + MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext = + (MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext; + + if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalVaryingStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) { + MySqlParser.DimensionDataTypeContext dimensionDataTypeContext = + (MySqlParser.DimensionDataTypeContext) dataTypeContext; + + Integer length = null; + Integer scale = null; + if (dimensionDataTypeContext.lengthOneDimension() != null) { + length = + parseLength( + dimensionDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + } + + if (dimensionDataTypeContext.lengthTwoDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoDimension().decimalLiteral(); + length = parseLength(decimalLiterals.get(0).getText()); + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + + if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral(); + if (decimalLiterals.get(0).REAL_LITERAL() != null) { + String[] digits = DOT.split(decimalLiterals.get(0).getText()); + if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) { + // Set default value 10 according mysql engine + length = 10; + } else { + length = parseLength(digits[0]); + } + } else { + length = parseLength(decimalLiterals.get(0).getText()); + } + + if (decimalLiterals.size() > 1) { + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + } + if (length != null) { + columnEditor.length(length); + } + if (scale != null) { + columnEditor.scale(scale); + } + } else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) { + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + if (collectionDataTypeContext.charsetName() != null) { + charsetName = collectionDataTypeContext.charsetName().getText(); + } + + if (dataType.name().equalsIgnoreCase("SET")) { + // After DBZ-132, it will always be comma separated + int optionsSize = + collectionDataTypeContext.collectionOptions().collectionOption().size(); + columnEditor.length( + Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas + } else { + columnEditor.length(1); + } + } + + String dataTypeName = dataType.name().toUpperCase(); + + if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) { + // type expression has to be set, because the value converter needs to know the enum or + // set options + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + + List collectionOptions = + collectionDataTypeContext.collectionOptions().collectionOption().stream() + .map(AntlrDdlParser::getText) + .collect(Collectors.toList()); + + columnEditor.type(dataTypeName); + columnEditor.enumValues(collectionOptions); + } else if (dataTypeName.equals("SERIAL")) { + // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE + columnEditor.type("BIGINT UNSIGNED"); + serialColumn(); + } else { + columnEditor.type(dataTypeName); + } + + int jdbcDataType = dataType.jdbcType(); + columnEditor.jdbcType(jdbcDataType); + + if (columnEditor.length() == -1) { + columnEditor.length((int) dataType.length()); + } + if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) { + columnEditor.scale(dataType.scale()); + } + if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) { + // NCHAR and NVARCHAR columns always uses utf8 as charset + columnEditor.charsetName("utf8"); + } else { + columnEditor.charsetName(charsetName); + } + } + + private Integer parseLength(String lengthStr) { + Long length = Long.parseLong(lengthStr); + if (length > Integer.MAX_VALUE) { + log.warn( + "The length '{}' of the column `{}` is too large to be supported, truncating it to '{}'", + length, + columnEditor.name(), + Integer.MAX_VALUE); + length = (long) Integer.MAX_VALUE; + } + return length.intValue(); + } + + private void serialColumn() { + if (optionalColumn.get() == null) { + optionalColumn.set(Boolean.FALSE); + } + uniqueColumn = true; + columnEditor.autoIncremented(true); + columnEditor.generated(true); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomDefaultValueParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomDefaultValueParserListener.java new file mode 100644 index 000000000000..869d85e1ea6a --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomDefaultValueParserListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.relational.ColumnEditor; + +import java.util.concurrent.atomic.AtomicReference; + +public class CustomDefaultValueParserListener extends DefaultValueParserListener { + + private final ColumnEditor columnEditor; + + public CustomDefaultValueParserListener( + ColumnEditor columnEditor, AtomicReference optionalColumn) { + super(columnEditor, optionalColumn); + this.columnEditor = columnEditor; + } + + @Override + public void enterDefaultValue(MySqlParser.DefaultValueContext ctx) { + if (ctx.currentTimestamp() != null && !ctx.currentTimestamp().isEmpty()) { + if (ctx.currentTimestamp().size() > 1 || (ctx.ON() == null && ctx.UPDATE() == null)) { + final MySqlParser.CurrentTimestampContext currentTimestamp = + ctx.currentTimestamp(0); + columnEditor.defaultValueExpression(currentTimestamp.getText()); + } + } else { + super.enterDefaultValue(ctx); + } + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java new file mode 100644 index 000000000000..00d75c986a3b --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; + +import com.google.common.collect.Lists; +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; + +import java.sql.Types; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** A ddl parser that will use custom listener. */ +public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { + + private final LinkedList parsedEvents; + + private RelationalDatabaseConnectorConfig dbzConnectorConfig; + + private final TablePath tablePath; + + public CustomMySqlAntlrDdlParser( + TablePath tablePath, RelationalDatabaseConnectorConfig dbzConnectorConfig) { + super(); + this.tablePath = tablePath; + this.parsedEvents = new LinkedList<>(); + this.dbzConnectorConfig = dbzConnectorConfig; + } + + @Override + public TableId parseQualifiedTableId(MySqlParser.FullIdContext fullIdContext) { + return new TableId( + tablePath.getDatabaseName(), tablePath.getSchemaName(), tablePath.getTableName()); + } + + // Overriding this method because the BIT type requires default length dimension of 1. + // Remove it when debezium fixed this issue. + @Override + protected DataTypeResolver initializeDataTypeResolver() { + DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder(); + + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.StringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT), + new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR), + new DataTypeResolver.DataTypeEntry( + Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHAR, + MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHARACTER, + MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.DimensionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT) + .setDefaultLengthDimension(1), + new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME), + new DataTypeResolver.DataTypeEntry( + Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP), + new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME), + new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SimpleDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.CollectionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SpatialDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.GEOMETRYCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING), + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.MULTILINESTRING), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARBINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARCHAR))); + + return dataTypeResolverBuilder.build(); + } + + @Override + protected AntlrDdlParserListener createParseTreeWalkerListener() { + return new CustomMySqlAntlrDdlParserListener(dbzConnectorConfig, this, parsedEvents); + } + + public List getAndClearParsedEvents() { + List result = Lists.newArrayList(parsedEvents); + parsedEvents.clear(); + return result; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java new file mode 100644 index 000000000000..4ea5ec705604 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.parser; + +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; + +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.ErrorNode; +import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.antlr.v4.runtime.tree.TerminalNode; + +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.ProxyParseTreeListenerUtil; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.text.ParsingException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** This listener's constructor will use some modified listener. */ +public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener + implements AntlrDdlParserListener { + + /** Collection of listeners for delegation of events. */ + private final List listeners = new CopyOnWriteArrayList<>(); + + /** Flag for skipping phase. */ + private boolean skipNodes; + + /** + * Count of skipped nodes. Each enter event during skipping phase will increase the counter and + * each exit event will decrease it. When counter will be decreased to 0, the skipping phase + * will end. + */ + private int skippedNodesCount = 0; + + /** Collection of catched exceptions. */ + private final Collection errors = new ArrayList<>(); + + public CustomMySqlAntlrDdlParserListener( + RelationalDatabaseConnectorConfig dbzConnectorConfig, + MySqlAntlrDdlParser parser, + LinkedList parsedEvents) { + // Currently only DDL statements that modify the table structure are supported, so add + // custom listeners to handle these events. + listeners.add( + new CustomAlterTableParserListener( + dbzConnectorConfig, parser, listeners, parsedEvents)); + } + + /** + * Returns all caught errors during tree walk. + * + * @return list of Parsing exceptions + */ + @Override + public Collection getErrors() { + return errors; + } + + @Override + public void enterEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + skippedNodesCount++; + } else { + ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors); + } + } + + @Override + public void exitEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + if (skippedNodesCount == 0) { + // back in the node where skipping started + skipNodes = false; + } else { + // going up in a tree, means decreasing a number of skipped nodes + skippedNodesCount--; + } + } else { + ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors); + } + } + + @Override + public void visitErrorNode(ErrorNode node) { + ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors); + } + + @Override + public void visitTerminal(TerminalNode node) { + ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors); + } + + @Override + public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) { + // this is a grammar rule for BEGIN ... END part of statements. Skip it. + skipNodes = true; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java index ce2ef61b2acf..22f9514c6f34 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlDefaultValueConverter; @@ -29,6 +30,9 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import lombok.extern.slf4j.Slf4j; +import java.util.Objects; +import java.util.Optional; + /** Utilities for converting from MySQL types to SeaTunnel types. */ @Slf4j public class MySqlTypeUtils { @@ -62,10 +66,17 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo MySqlValueConverters::defaultParsingErrorHandler); MySqlDefaultValueConverter mySqlDefaultValueConverter = new MySqlDefaultValueConverter(mySqlValueConverters); - Object defaultValue = - mySqlDefaultValueConverter - .parseDefaultValue(column, column.defaultValueExpression().orElse(null)) - .orElse(null); + + Optional defaultValueExpression = column.defaultValueExpression(); + Object defaultValue = defaultValueExpression.orElse(null); + if (defaultValueExpression.isPresent() + && Objects.nonNull(defaultValue) + && !MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue)) { + defaultValue = + mySqlDefaultValueConverter + .parseDefaultValue(column, defaultValueExpression.get()) + .orElse(null); + } BasicTypeDefine.BasicTypeDefineBuilder builder = BasicTypeDefine.builder() .name(column.name()) @@ -74,6 +85,7 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo .length((long) column.length()) .precision((long) column.length()) .scale(column.scale().orElse(0)) + .nullable(column.isOptional()) .defaultValue(defaultValue); switch (column.typeName().toUpperCase()) { case MySqlTypeConverter.MYSQL_CHAR: diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java index 231501ea5cdf..1f7953a9fc16 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java @@ -55,7 +55,7 @@ public class UniqueDatabase { private final MySqlContainer container; private final String databaseName; - private final String templateName; + private String templateName; private final String username; private final String password; @@ -123,6 +123,11 @@ public String getPassword() { return password; } + public UniqueDatabase setTemplateName(String templateName) { + this.templateName = templateName; + return this; + } + /** @return Fully qualified table name <databaseName>.<tableName> */ public String qualifiedTableName(final String tableName) { return String.format("%s.%s", databaseName, tableName); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java index 06b4d77184d2..2c2cac27b4dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java @@ -31,7 +31,10 @@ public enum JdbcConnectorErrorCode implements SeaTunnelErrorCode { DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"), KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed"), NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support operation."), - DATA_TYPE_CAST_FAILED("JDBC-10", "Data type cast failed"); + DATA_TYPE_CAST_FAILED("JDBC-10", "Data type cast failed"), + REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT( + "JDBC-11", "Refresh the table with schema change failed"); + private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java index 3a6747aa89fb..1f35b3d2b9ed 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java @@ -70,11 +70,37 @@ private static void setProperties( Optional method = findSetterMethod(commonDataSource.getClass().getMethods(), entry.getKey()); if (method.isPresent()) { - method.get().invoke(commonDataSource, entry.getValue()); + Method setterMethod = method.get(); + Class parameterType = setterMethod.getParameterTypes()[0]; + Object value = entry.getValue(); + if (!parameterType.isInstance(value)) { + value = convertType(value, parameterType); + } + method.get().invoke(commonDataSource, value); } } } + private static Object convertType(Object value, Class targetType) { + if (targetType.isInstance(value)) { + return value; + } + if (targetType == Integer.class || targetType == int.class) { + return Integer.parseInt(value.toString()); + } else if (targetType == Long.class || targetType == long.class) { + return Long.parseLong(value.toString()); + } else if (targetType == Boolean.class || targetType == boolean.class) { + return Boolean.parseBoolean(value.toString()); + } else if (targetType == Double.class || targetType == double.class) { + return Double.parseDouble(value.toString()); + } else if (targetType == Float.class || targetType == float.class) { + return Float.parseFloat(value.toString()); + } else if (targetType == String.class) { + return value.toString(); + } + throw new IllegalArgumentException("Unsupported parameter type: " + targetType); + } + private static Method findGetterMethod(final DataSource dataSource, final String propertyName) throws NoSuchMethodException { String getterMethodName = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index da92f8210923..e59776b6f957 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -17,7 +17,17 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.ConverterLoader; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider; @@ -30,6 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.mysql.cj.MysqlType; + import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; @@ -41,6 +53,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -423,4 +436,270 @@ default JdbcConnectionProvider getJdbcConnectionProvider( default String convertType(String columnName, String columnType) { return columnName; } + + /** + * Refresh physical table schema by schema change event + * + * @param sourceDialectName source dialect name + * @param event schema change event + * @param jdbcConnectionProvider jdbc connection provider + * @param sinkTablePath sink table path + */ + default void refreshTableSchemaBySchemaChangeEvent( + String sourceDialectName, + AlterTableColumnEvent event, + JdbcConnectionProvider jdbcConnectionProvider, + TablePath sinkTablePath) {} + + /** + * generate alter table sql + * + * @param sourceDialectName source dialect name + * @param event schema change event + * @param sinkTablePath sink table path + * @return alter table sql for sink table + */ + default String generateAlterTableSql( + String sourceDialectName, AlterTableColumnEvent event, TablePath sinkTablePath) { + String tableIdentifierWithQuoted = + tableIdentifier(sinkTablePath.getDatabaseName(), sinkTablePath.getTableName()); + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + Column addColumn = ((AlterTableAddColumnEvent) event).getColumn(); + return buildAlterTableSql( + sourceDialectName, + addColumn.getSourceType(), + AlterType.ADD.name(), + addColumn, + tableIdentifierWithQuoted, + StringUtils.EMPTY); + case SCHEMA_CHANGE_DROP_COLUMN: + String dropColumn = ((AlterTableDropColumnEvent) event).getColumn(); + return buildAlterTableSql( + sourceDialectName, + null, + AlterType.DROP.name(), + null, + tableIdentifierWithQuoted, + dropColumn); + case SCHEMA_CHANGE_MODIFY_COLUMN: + Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn(); + return buildAlterTableSql( + sourceDialectName, + modifyColumn.getSourceType(), + AlterType.MODIFY.name(), + modifyColumn, + tableIdentifierWithQuoted, + StringUtils.EMPTY); + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + (AlterTableChangeColumnEvent) event; + Column changeColumn = alterTableChangeColumnEvent.getColumn(); + String oldColumnName = alterTableChangeColumnEvent.getOldColumn(); + return buildAlterTableSql( + sourceDialectName, + changeColumn.getSourceType(), + AlterType.CHANGE.name(), + changeColumn, + tableIdentifierWithQuoted, + oldColumnName); + default: + throw new SeaTunnelException( + "Unsupported schemaChangeEvent for event type: " + event.getEventType()); + } + } + + /** + * build alter table sql + * + * @param sourceDialectName source dialect name + * @param sourceColumnType source column type + * @param alterOperation alter operation of ddl + * @param newColumn new column after ddl + * @param tableName table name of sink table + * @param oldColumnName old column name before ddl + * @return alter table sql for sink table after schema change + */ + default String buildAlterTableSql( + String sourceDialectName, + String sourceColumnType, + String alterOperation, + Column newColumn, + String tableName, + String oldColumnName) { + if (StringUtils.equals(alterOperation, AlterType.DROP.name())) { + return String.format( + "ALTER TABLE %s drop column %s", tableName, quoteIdentifier(oldColumnName)); + } + TypeConverter typeConverter = ConverterLoader.loadTypeConverter(dialectName()); + BasicTypeDefine typeBasicTypeDefine = + (BasicTypeDefine) typeConverter.reconvert(newColumn); + + String basicSql = buildAlterTableBasicSql(alterOperation, tableName); + basicSql = + decorateWithColumnNameAndType( + sourceDialectName, + sourceColumnType, + basicSql, + alterOperation, + newColumn, + oldColumnName, + typeBasicTypeDefine.getColumnType()); + basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine); + basicSql = decorateWithDefaultValue(basicSql, typeBasicTypeDefine); + basicSql = decorateWithComment(basicSql, typeBasicTypeDefine); + return basicSql + ";"; + } + + /** + * build the body of alter table sql + * + * @param alterOperation alter operation of ddl + * @param tableName table name of sink table + * @return basic sql of alter table for sink table + */ + default String buildAlterTableBasicSql(String alterOperation, String tableName) { + StringBuilder sql = + new StringBuilder( + "ALTER TABLE " + + tableName + + StringUtils.SPACE + + alterOperation + + StringUtils.SPACE); + return sql.toString(); + } + + /** + * decorate the sql with column name and type + * + * @param sourceDialectName source dialect name + * @param sourceColumnType source column type + * @param basicSql basic sql of alter table for sink table + * @param alterOperation alter operation of ddl + * @param newColumn new column after ddl + * @param oldColumnName old column name before ddl + * @param columnType column type of new column + * @return basic sql with column name and type of alter table for sink table + */ + default String decorateWithColumnNameAndType( + String sourceDialectName, + String sourceColumnType, + String basicSql, + String alterOperation, + Column newColumn, + String oldColumnName, + String columnType) { + StringBuilder sql = new StringBuilder(basicSql); + String oldColumnNameWithQuoted = quoteIdentifier(oldColumnName); + String newColumnNameWithQuoted = quoteIdentifier(newColumn.getName()); + if (alterOperation.equals(AlterType.CHANGE.name())) { + sql.append(oldColumnNameWithQuoted) + .append(StringUtils.SPACE) + .append(newColumnNameWithQuoted) + .append(StringUtils.SPACE); + } else { + sql.append(newColumnNameWithQuoted).append(StringUtils.SPACE); + } + if (sourceDialectName.equals(dialectName())) { + sql.append(sourceColumnType); + } else { + sql.append(columnType); + } + sql.append(StringUtils.SPACE); + return sql.toString(); + } + + /** + * decorate with nullable + * + * @param basicSql alter table sql for sink table + * @param typeBasicTypeDefine type basic type define of new column + * @return alter table sql with nullable for sink table + */ + default String decorateWithNullable( + String basicSql, BasicTypeDefine typeBasicTypeDefine) { + StringBuilder sql = new StringBuilder(basicSql); + if (typeBasicTypeDefine.isNullable()) { + sql.append("NULL "); + } else { + sql.append("NOT NULL "); + } + return sql.toString(); + } + + /** + * decorate with default value + * + * @param basicSql alter table sql for sink table + * @param typeBasicTypeDefine type basic type define of new column + * @return alter table sql with default value for sink table + */ + default String decorateWithDefaultValue( + String basicSql, BasicTypeDefine typeBasicTypeDefine) { + Object defaultValue = typeBasicTypeDefine.getDefaultValue(); + if (Objects.nonNull(defaultValue) + && needsQuotesWithDefaultValue(typeBasicTypeDefine.getColumnType()) + && !isSpecialDefaultValue(defaultValue)) { + defaultValue = quotesDefaultValue(defaultValue); + } + StringBuilder sql = new StringBuilder(basicSql); + if (Objects.nonNull(defaultValue)) { + sql.append("DEFAULT ").append(defaultValue).append(StringUtils.SPACE); + } + return sql.toString(); + } + + /** + * decorate with comment + * + * @param basicSql alter table sql for sink table + * @param typeBasicTypeDefine type basic type define of new column + * @return alter table sql with comment for sink table + */ + default String decorateWithComment( + String basicSql, BasicTypeDefine typeBasicTypeDefine) { + String comment = typeBasicTypeDefine.getComment(); + StringBuilder sql = new StringBuilder(basicSql); + if (StringUtils.isNotBlank(comment)) { + sql.append("COMMENT '").append(comment).append("'"); + } + return sql.toString(); + } + + /** + * whether quotes with default value + * + * @param sqlType sql type of column + * @return whether needs quotes with the type + */ + default boolean needsQuotesWithDefaultValue(String sqlType) { + return false; + } + + /** + * whether is special default value e.g. current_timestamp + * + * @param defaultValue default value of column + * @return whether is special default value e.g current_timestamp + */ + default boolean isSpecialDefaultValue(Object defaultValue) { + return false; + } + + /** + * quotes default value + * + * @param defaultValue default value of column + * @return quoted default value + */ + default String quotesDefaultValue(Object defaultValue) { + return "'" + defaultValue + "'"; + } + + enum AlterType { + ADD, + DROP, + MODIFY, + CHANGE + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 03067f6d5e3e..73ef12bc47b6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -18,6 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; @@ -25,9 +30,11 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils; import org.apache.commons.lang3.StringUtils; +import com.mysql.cj.MysqlType; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -45,6 +52,10 @@ @Slf4j public class MysqlDialect implements JdbcDialect { + + private static final List NOT_SUPPORTED_DEFAULT_VALUES = + Arrays.asList(MysqlType.BLOB, MysqlType.TEXT, MysqlType.JSON, MysqlType.GEOMETRY); + public String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); public MysqlDialect() {} @@ -214,4 +225,64 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta return SQLUtils.countForSubquery(connection, table.getQuery()); } + + @Override + public void refreshTableSchemaBySchemaChangeEvent( + String sourceDialectName, + AlterTableColumnEvent event, + JdbcConnectionProvider refreshTableSchemaConnectionProvider, + TablePath sinkTablePath) { + try (Connection connection = + refreshTableSchemaConnectionProvider.getOrEstablishConnection(); + Statement stmt = connection.createStatement()) { + String alterTableSql = generateAlterTableSql(sourceDialectName, event, sinkTablePath); + log.info("Apply schema change with sql: {}", alterTableSql); + stmt.execute(alterTableSql); + } catch (Exception e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e); + } + } + + @Override + public String decorateWithComment( + String basicSql, BasicTypeDefine mysqlTypeBasicTypeDefine) { + MysqlType nativeType = mysqlTypeBasicTypeDefine.getNativeType(); + if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) { + return basicSql; + } + return JdbcDialect.super.decorateWithComment(basicSql, mysqlTypeBasicTypeDefine); + } + + @Override + public boolean needsQuotesWithDefaultValue(String sqlType) { + MysqlType mysqlType = MysqlType.getByName(sqlType); + switch (mysqlType) { + case CHAR: + case VARCHAR: + case TEXT: + case TINYTEXT: + case MEDIUMTEXT: + case LONGTEXT: + case ENUM: + case SET: + case BLOB: + case TINYBLOB: + case MEDIUMBLOB: + case LONGBLOB: + case DATE: + case DATETIME: + case TIMESTAMP: + case TIME: + case YEAR: + return true; + default: + return false; + } + } + + @Override + public boolean isSpecialDefaultValue(Object defaultValue) { + return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java new file mode 100644 index 000000000000..af651beb7c24 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; + +@Slf4j +public abstract class AbstractJdbcSinkWriter + implements SinkWriter { + + protected JdbcDialect dialect; + protected TablePath sinkTablePath; + protected TableSchema tableSchema; + protected transient boolean isOpen; + protected JdbcConnectionProvider connectionProvider; + protected JdbcSinkConfig jdbcSinkConfig; + protected JdbcOutputFormat> outputFormat; + + @Override + public void applySchemaChange(SchemaChangeEvent event) throws IOException { + if (event instanceof AlterTableColumnsEvent) { + AlterTableColumnsEvent alterTableColumnsEvent = (AlterTableColumnsEvent) event; + String sourceDialectName = alterTableColumnsEvent.getSourceDialectName(); + if (StringUtils.isBlank(sourceDialectName)) { + throw new SeaTunnelException( + "The sourceDialectName in AlterTableColumnEvent can not be empty"); + } + List events = alterTableColumnsEvent.getEvents(); + for (AlterTableColumnEvent alterTableColumnEvent : events) { + processSchemaChangeEvent(alterTableColumnEvent, sourceDialectName); + } + } else { + log.warn("We only support AlterTableColumnsEvent, but actual event is " + event); + } + } + + protected void processSchemaChangeEvent(AlterTableColumnEvent event, String sourceDialectName) + throws IOException { + TableSchema newTableSchema = this.tableSchema.copy(); + List columns = newTableSchema.getColumns(); + switch (event.getEventType()) { + case SCHEMA_CHANGE_ADD_COLUMN: + Column addColumn = ((AlterTableAddColumnEvent) event).getColumn(); + columns.add(addColumn); + break; + case SCHEMA_CHANGE_DROP_COLUMN: + String dropColumn = ((AlterTableDropColumnEvent) event).getColumn(); + columns.removeIf(column -> column.getName().equalsIgnoreCase(dropColumn)); + break; + case SCHEMA_CHANGE_MODIFY_COLUMN: + Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn(); + replaceColumnByIndex(columns, modifyColumn.getName(), modifyColumn); + break; + case SCHEMA_CHANGE_CHANGE_COLUMN: + AlterTableChangeColumnEvent alterTableChangeColumnEvent = + (AlterTableChangeColumnEvent) event; + Column changeColumn = alterTableChangeColumnEvent.getColumn(); + String oldColumnName = alterTableChangeColumnEvent.getOldColumn(); + replaceColumnByIndex(columns, oldColumnName, changeColumn); + break; + default: + throw new SeaTunnelException( + "Unsupported schemaChangeEvent for event type: " + event.getEventType()); + } + this.tableSchema = newTableSchema; + reOpenOutputFormat(event, sourceDialectName); + } + + protected void reOpenOutputFormat(AlterTableColumnEvent event, String sourceDialectName) + throws IOException { + this.prepareCommit(); + try { + JdbcConnectionProvider refreshTableSchemaConnectionProvider = + dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig()); + dialect.refreshTableSchemaBySchemaChangeEvent( + sourceDialectName, event, refreshTableSchemaConnectionProvider, sinkTablePath); + } catch (Throwable e) { + throw new JdbcConnectorException( + JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e); + } + this.outputFormat = + new JdbcOutputFormatBuilder( + dialect, connectionProvider, jdbcSinkConfig, tableSchema) + .build(); + this.outputFormat.open(); + } + + protected void replaceColumnByIndex( + List columns, String oldColumnName, Column newColumn) { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equalsIgnoreCase(oldColumnName)) { + columns.set(i, newColumn); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java index 77e673ae6b28..31c89dc21bfd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -20,16 +20,15 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl; @@ -54,9 +53,8 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; -public class JdbcExactlyOnceSinkWriter - implements SinkWriter, - SupportMultiTableSinkWriter { +public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSinkWriter + implements SupportMultiTableSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class); private final SinkWriter.Context sinkcontext; @@ -71,15 +69,11 @@ public class JdbcExactlyOnceSinkWriter private final XidGenerator xidGenerator; - private final JdbcOutputFormat> - outputFormat; - - private transient boolean isOpen; - private transient Xid currentXid; private transient Xid prepareXid; public JdbcExactlyOnceSinkWriter( + TablePath sinkTablePath, SinkWriter.Context sinkcontext, JobContext context, JdbcDialect dialect, @@ -90,14 +84,18 @@ public JdbcExactlyOnceSinkWriter( jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0, "JDBC XA sink requires maxRetries equal to 0, otherwise it could " + "cause duplicates."); - + this.sinkTablePath = sinkTablePath; + this.dialect = dialect; + this.tableSchema = tableSchema; + this.jdbcSinkConfig = jdbcSinkConfig; this.context = context; this.sinkcontext = sinkcontext; this.recoverStates = states; this.xidGenerator = XidGenerator.semanticXidGenerator(); checkState(jdbcSinkConfig.isExactlyOnce(), "is_exactly_once config error"); - this.xaFacade = + this.connectionProvider = XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig()); + this.xaFacade = (XaFacade) this.connectionProvider; this.outputFormat = new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, tableSchema).build(); this.xaGroupOps = new XaGroupOpsImpl(xaFacade); @@ -180,10 +178,12 @@ public void close() throws IOException { CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC exactly one writer", e); + } finally { + outputFormat.close(); + xidGenerator.close(); + currentXid = null; + prepareXid = null; } - xidGenerator.close(); - currentXid = null; - prepareXid = null; } private void beginTx() throws IOException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 69b01f10d264..946956a428a6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -102,10 +102,12 @@ public String getPluginName() { @Override public SinkWriter createWriter( SinkWriter.Context context) { + TablePath sinkTablePath = catalogTable.getTablePath(); SinkWriter sinkWriter; if (jdbcSinkConfig.isExactlyOnce()) { sinkWriter = new JdbcExactlyOnceSinkWriter( + sinkTablePath, context, jobContext, dialect, @@ -117,10 +119,12 @@ public SinkWriter createWriter( String keyName = tableSchema.getPrimaryKey().getColumnNames().get(0); int index = tableSchema.toPhysicalRowDataType().indexOf(keyName); if (index > -1) { - return new JdbcSinkWriter(dialect, jdbcSinkConfig, tableSchema, index); + return new JdbcSinkWriter( + sinkTablePath, dialect, jdbcSinkConfig, tableSchema, index); } } - sinkWriter = new JdbcSinkWriter(dialect, jdbcSinkConfig, tableSchema, null); + sinkWriter = + new JdbcSinkWriter(sinkTablePath, dialect, jdbcSinkConfig, tableSchema, null); } return sinkWriter; } @@ -128,9 +132,16 @@ public SinkWriter createWriter( @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { + TablePath sinkTablePath = catalogTable.getTablePath(); if (jdbcSinkConfig.isExactlyOnce()) { return new JdbcExactlyOnceSinkWriter( - context, jobContext, dialect, jdbcSinkConfig, tableSchema, states); + sinkTablePath, + context, + jobContext, + dialect, + jdbcSinkConfig, + tableSchema, + states); } return SeaTunnelSink.super.restoreWriter(context, states); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index d8c7b661c597..4331b53d0a02 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -18,20 +18,17 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; import org.apache.seatunnel.api.sink.MultiTableResourceManager; -import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -45,25 +42,20 @@ import java.util.Optional; @Slf4j -public class JdbcSinkWriter - implements SinkWriter, - SupportMultiTableSinkWriter { - private JdbcOutputFormat> outputFormat; - private final JdbcDialect dialect; - private final TableSchema tableSchema; - private JdbcConnectionProvider connectionProvider; - private transient boolean isOpen; +public class JdbcSinkWriter extends AbstractJdbcSinkWriter + implements SupportMultiTableSinkWriter { private final Integer primaryKeyIndex; - private final JdbcSinkConfig jdbcSinkConfig; public JdbcSinkWriter( + TablePath sinkTablePath, JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, Integer primaryKeyIndex) { - this.jdbcSinkConfig = jdbcSinkConfig; + this.sinkTablePath = sinkTablePath; this.dialect = dialect; this.tableSchema = tableSchema; + this.jdbcSinkConfig = jdbcSinkConfig; this.primaryKeyIndex = primaryKeyIndex; this.connectionProvider = dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig()); @@ -168,7 +160,8 @@ public void close() throws IOException { CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "unable to close JDBC sink write", e); + } finally { + outputFormat.close(); } - outputFormat.close(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/MysqlDefaultValueUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/MysqlDefaultValueUtils.java new file mode 100644 index 000000000000..2b16fd6438be --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/MysqlDefaultValueUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + +import java.util.Objects; + +public class MysqlDefaultValueUtils { + public static boolean isSpecialDefaultValue(Object defaultValue) { + if (Objects.isNull(defaultValue)) { + return false; + } + String defaultValueStr = defaultValue.toString(); + return defaultValueStr.matches( + "(?i)^(CURRENT_TIMESTAMP|CURRENT_TIME|CURRENT_DATE)\\(?\\d*\\)?$") + || defaultValueStr.equalsIgnoreCase("TRUE") + || defaultValueStr.equalsIgnoreCase("FALSE"); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java new file mode 100644 index 000000000000..531f6f03f96c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Currently SPARK do not support cdc. In addition, currently only the zeta engine supports schema evolution for pr https://github.com/apache/seatunnel/pull/5125.") +public class MysqlCDCWithSchemaChangeIT extends TestSuiteBase implements TestResource { + private static final String MYSQL_DATABASE = "shop"; + private static final String SOURCE_TABLE = "products"; + private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table_with_schema_change"; + private static final String SINK_TABLE2 = + "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once"; + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + + private static final String QUERY = "select * from %s.%s"; + private static final String DESC = "desc %s.%s"; + private static final String PROJECTION_QUERY = + "select id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s;"; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase shopDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE); + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(MYSQL_DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private String driverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib && cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @Order(1) + @TestTemplate + public void testMysqlCdcWithSchemaEvolutionCase(TestContainer container) { + + CompletableFuture.runAsync( + () -> { + try { + container.executeJob("/mysqlcdc_to_mysql_with_schema_change.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + assertSchemaEvolution(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE); + } + + @Order(2) + @TestTemplate + public void testMysqlCdcWithSchemaEvolutionCaseExactlyOnce(TestContainer container) { + + shopDatabase.setTemplateName("shop").createAndInitialize(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + assertSchemaEvolution(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE2); + } + + private void assertSchemaEvolution(String database, String sourceTable, String sinkTable) { + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query(String.format(QUERY, database, sourceTable)), + query(String.format(QUERY, database, sinkTable)))); + + // case1 add columns with cdc data at same time + shopDatabase.setTemplateName("add_columns").createAndInitialize(); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query(String.format(DESC, database, sourceTable)), + query(String.format(DESC, database, sinkTable)))); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format(QUERY, database, sourceTable) + + " where id >= 128"), + query( + String.format(QUERY, database, sinkTable) + + " where id >= 128")); + + Assertions.assertIterableEquals( + query(String.format(PROJECTION_QUERY, database, sourceTable)), + query(String.format(PROJECTION_QUERY, database, sinkTable))); + + // The default value of add_column4 is current_timestamp(),so the + // history data of sink table with this column may be different from the + // source table because delay of apply schema change. + String query = + String.format( + "SELECT t1.id AS table1_id, t1.add_column4 AS table1_timestamp, " + + "t2.id AS table2_id, t2.add_column4 AS table2_timestamp, " + + "ABS(TIMESTAMPDIFF(SECOND, t1.add_column4, t2.add_column4)) AS time_diff " + + "FROM %s.%s t1 " + + "INNER JOIN %s.%s t2 ON t1.id = t2.id", + database, sourceTable, database, sinkTable); + try (Connection jdbcConnection = getJdbcConnection(); + Statement statement = jdbcConnection.createStatement(); + ResultSet resultSet = statement.executeQuery(query); ) { + while (resultSet.next()) { + int timeDiff = resultSet.getInt("time_diff"); + Assertions.assertTrue( + timeDiff <= 3, + "Time difference exceeds 3 seconds: " + + timeDiff + + " seconds"); + } + } + }); + + // case2 drop columns with cdc data at same time + assertCaseByDdlName("drop_columns", database, sourceTable, sinkTable); + + // case3 change column name with cdc data at same time + assertCaseByDdlName("change_columns", database, sourceTable, sinkTable); + + // case4 modify column data type with cdc data at same time + assertCaseByDdlName("modify_columns", database, sourceTable, sinkTable); + } + + private void assertCaseByDdlName( + String drop_columns, String database, String sourceTable, String sinkTable) { + shopDatabase.setTemplateName(drop_columns).createAndInitialize(); + assertTableStructureAndData(database, sourceTable, sinkTable); + } + + private void assertTableStructureAndData( + String database, String sourceTable, String sinkTable) { + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query(String.format(DESC, database, sourceTable)), + query(String.format(DESC, database, sinkTable)))); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query(String.format(QUERY, database, sourceTable)), + query(String.format(QUERY, database, sinkTable)))); + } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + @BeforeAll + @Override + public void startUp() { + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + shopDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + } + + @AfterAll + @Override + public void tearDown() { + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + } + + private List> query(String sql) { + try (Connection connection = getJdbcConnection()) { + ResultSet resultSet = connection.createStatement().executeQuery(sql); + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql new file mode 100644 index 000000000000..7ec4e23e6210 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql @@ -0,0 +1,69 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; +INSERT INTO products +VALUES (110,"scooter","Small 2-wheel scooter",3.14), + (111,"car battery","12V car battery",8.1), + (112,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (113,"hammer","12oz carpenter's hammer",0.75), + (114,"hammer","14oz carpenter's hammer",0.875), + (115,"hammer","16oz carpenter's hammer",1.0), + (116,"rocks","box of assorted rocks",5.3), + (117,"jacket","water resistent black wind breaker",0.1), + (118,"spare tire","24 inch spare tire",22.2); +update products set name = 'dailai' where id = 101; +delete from products where id = 102; + +alter table products ADD COLUMN add_column1 varchar(64) not null default 'yy',ADD COLUMN add_column2 int not null default 1; + +update products set name = 'dailai' where id = 110; +insert into products +values (119,"scooter","Small 2-wheel scooter",3.14,'xx',1), + (120,"car battery","12V car battery",8.1,'xx',2), + (121,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3), + (122,"hammer","12oz carpenter's hammer",0.75,'xx',4), + (123,"hammer","14oz carpenter's hammer",0.875,'xx',5), + (124,"hammer","16oz carpenter's hammer",1.0,'xx',6), + (125,"rocks","box of assorted rocks",5.3,'xx',7), + (126,"jacket","water resistent black wind breaker",0.1,'xx',8), + (127,"spare tire","24 inch spare tire",22.2,'xx',9); +delete from products where id = 118; + +alter table products ADD COLUMN add_column3 float not null default 1.1; +alter table products ADD COLUMN add_column4 timestamp not null default current_timestamp(); + +delete from products where id = 113; +insert into products +values (128,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:09'), + (129,"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 09:09:09'), + (130,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'), + (131,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'), + (132,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'), + (133,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 09:09:09'), + (134,"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 09:09:09'), + (135,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'), + (136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 09:09:09'); +update products set name = 'dailai' where id = 135; + + +-- add column for irrelevant table +ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null default 'yy'; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/change_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/change_columns.sql new file mode 100644 index 000000000000..a17f9a0a9368 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/change_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products change add_column2 add_column int default 1 not null; +delete from products where id < 155; +insert into products +values (155,"scooter","Small 2-wheel scooter",3.14,1), + (156,"car battery","12V car battery",8.1,2), + (157,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (158,"hammer","12oz carpenter's hammer",0.75,4), + (159,"hammer","14oz carpenter's hammer",0.875,5), + (160,"hammer","16oz carpenter's hammer",1.0,6), + (161,"rocks","box of assorted rocks",5.3,7), + (162,"jacket","water resistent black wind breaker",0.1,8), + (163,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql new file mode 100644 index 000000000000..d6502bbfdf49 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql @@ -0,0 +1,50 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products drop column add_column4; +insert into products +values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1), + (138,"car battery","12V car battery",8.1,'xx',2,1.2), + (139,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,'xx',3,1.3), + (140,"hammer","12oz carpenter's hammer",0.75,'xx',4,1.4), + (141,"hammer","14oz carpenter's hammer",0.875,'xx',5,1.5), + (142,"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6), + (143,"rocks","box of assorted rocks",5.3,'xx',7,1.7), + (144,"jacket","water resistent black wind breaker",0.1,'xx',8,1.8), + (145,"spare tire","24 inch spare tire",22.2,'xx',9,1.9); +update products set name = 'dailai' where id in (140,141,142); +delete from products where id < 137; + + +alter table products drop column add_column1,drop column add_column3; +insert into products +values (146,"scooter","Small 2-wheel scooter",3.14,1), + (147,"car battery","12V car battery",8.1,2), + (148,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (149,"hammer","12oz carpenter's hammer",0.75,4), + (150,"hammer","14oz carpenter's hammer",0.875,5), + (151,"hammer","16oz carpenter's hammer",1.0,6), + (152,"rocks","box of assorted rocks",5.3,7), + (153,"jacket","water resistent black wind breaker",0.1,8), + (154,"spare tire","24 inch spare tire",22.2,9); +update products set name = 'dailai' where id > 143; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql index 9e9fff3f8f4c..a0a225981f33 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql @@ -18,7 +18,6 @@ -- ---------------------------------------------------------------------------------------------------------------- -- DATABASE: inventory -- ---------------------------------------------------------------------------------------------------------------- - -- Create and populate our products using a single insert with many rows CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, @@ -92,4 +91,4 @@ VALUES (default, '2016-01-16', 1001, 1, 102), CREATE TABLE category ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, category_name VARCHAR(255) -); \ No newline at end of file +); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/modify_columns.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/modify_columns.sql new file mode 100644 index 000000000000..ab64c47567bd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/modify_columns.sql @@ -0,0 +1,36 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +alter table products modify name longtext null; +delete from products where id < 155; +insert into products +values (164,"scooter","Small 2-wheel scooter",3.14,1), + (165,"car battery","12V car battery",8.1,2), + (166,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8,3), + (167,"hammer","12oz carpenter's hammer",0.75,4), + (168,"hammer","14oz carpenter's hammer",0.875,5), + (169,"hammer","16oz carpenter's hammer",1.0,6), + (170,"rocks","box of assorted rocks",5.3,7), + (171,"jacket","water resistent black wind breaker",0.1,8), + (172,"spare tire","24 inch spare tire",22.2,9); + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql new file mode 100644 index 000000000000..f97d5852f3a4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/shop.sql @@ -0,0 +1,78 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: shop +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `shop`; +use shop; + +drop table if exists products; +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +drop table if exists mysql_cdc_e2e_sink_table_with_schema_change; +CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +drop table if exists mysql_cdc_e2e_sink_table_with_schema_change_exactly_once; +CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change_exactly_once ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel', + description VARCHAR(512), + weight FLOAT +); + +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (101,"scooter","Small 2-wheel scooter",3.14), + (102,"car battery","12V car battery",8.1), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (104,"hammer","12oz carpenter's hammer",0.75), + (105,"hammer","14oz carpenter's hammer",0.875), + (106,"hammer","16oz carpenter's hammer",1.0), + (107,"rocks","box of assorted rocks",5.3), + (108,"jacket","water resistent black wind breaker",0.1), + (109,"spare tire","24 inch spare tire",22.2); + + +drop table if exists products_on_hand; +CREATE TABLE products_on_hand ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL +); + + +INSERT INTO products_on_hand VALUES (101,3); +INSERT INTO products_on_hand VALUES (102,8); +INSERT INTO products_on_hand VALUES (103,18); +INSERT INTO products_on_hand VALUES (104,4); +INSERT INTO products_on_hand VALUES (105,5); +INSERT INTO products_on_hand VALUES (106,0); +INSERT INTO products_on_hand VALUES (107,44); +INSERT INTO products_on_hand VALUES (108,2); +INSERT INTO products_on_hand VALUES (109,5); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf new file mode 100644 index 000000000000..0594a42d9565 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 5 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + server-id = 5652-5657 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + jdbc { + url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + generate_sink_sql = true + database = shop + table = mysql_cdc_e2e_sink_table_with_schema_change + primary_keys = ["id"] + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf new file mode 100644 index 000000000000..8aa06c85bd92 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 5 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + MySQL-CDC { + server-id = 5652-5657 + username = "st_user_source" + password = "mysqlpw" + table-names = ["shop.products"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + debezium = { + include.schema.changes = true + } + } +} + +sink { + jdbc { + url = "jdbc:mysql://mysql_cdc_e2e:3306/shop" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + generate_sink_sql = true + database = shop + table = mysql_cdc_e2e_sink_table_with_schema_change_exactly_once + primary_keys = ["id"] + is_exactly_once = true + xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java index 7ec617516169..cb0958c705c3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -21,9 +21,13 @@ import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseMySqlCatalog; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; +import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; @@ -54,6 +58,24 @@ public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { private static final String OCEANBASE_DATABASE = "seatunnel"; private static final String OCEANBASE_CATALOG_DATABASE = "seatunnel_catalog"; + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && wget " + + driverUrl() + + " && wget " + + mysqlDriverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + String mysqlDriverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + @Override List configFile() { return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java index ac51a6df41bf..6b200078f47c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java @@ -77,7 +77,8 @@ public class MilvusIT extends TestSuiteBase implements TestResource { @Override public void startUp() throws Exception { this.container = new MilvusContainer(MILVUS_IMAGE); - log.info("Milvus host is {}", container.getHost()); + this.container.setNetwork(NETWORK); + log.info("Milvus host is {}", container.getEndpoint()); Startables.deepStart(Stream.of(this.container)).join(); log.info("Milvus container started"); Awaitility.given().ignoreExceptions().await().atMost(720L, TimeUnit.SECONDS);