From 4c86e0e7fd07ee42da759d3af5b178d9bc7417dc Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Tue, 6 Aug 2024 14:03:06 +0800 Subject: [PATCH] 1 --- .../serialize/SeaTunnelRowSerializer.java | 13 +- .../doris/sink/writer/DorisStreamLoad.java | 32 ++-- .../connector-doris-e2e/pom.xml | 14 ++ .../e2e/connector/doris/DorisCDCSinkIT.java | 141 ++++++++++++++++-- .../src/test/resources/ddl/mysql_cdc.sql | 38 +++++ .../test/resources/docker/server-gtids/my.cnf | 65 ++++++++ .../src/test/resources/docker/setup.sql | 28 ++++ .../write-cdc-changelog-to-doris.conf | 17 +-- 8 files changed, 295 insertions(+), 53 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java index 0c5b9c0c420..cc12d88ea39 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.format.text.TextSerializationSchema; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -93,13 +94,15 @@ public void open() throws IOException {} @Override public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException { - List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); - List> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes()); + List fieldNames = new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames())); + List> fieldTypes = + new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes())); if (enableDelete) { - SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy(); - seaTunnelRowEnableDelete.setField( - seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind())); + + List newFields = new ArrayList<>(Arrays.asList(seaTunnelRow.getFields())); + newFields.add(parseDeleteSign(seaTunnelRow.getRowKind())); + seaTunnelRow = new SeaTunnelRow(newFields.toArray()); fieldNames.add(LoadConstants.DORIS_DELETE_SIGN); fieldTypes.add(STRING_TYPE); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index eadcf94cd56..40b75aedc61 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -17,7 +17,10 @@ package org.apache.seatunnel.connectors.doris.sink.writer; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; @@ -31,9 +34,9 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable { private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; private static final String JOB_EXIST_FINISHED = "FINISHED"; private final String loadUrlStr; - private final String hostPort; + @Getter private final String hostPort; private final String abortUrlStr; private final String user; private final String passwd; - private final String db; + @Getter private final String db; private final String table; private final boolean enable2PC; private final boolean enableDelete; private final Properties streamLoadProp; private final RecordStream recordStream; - private Future pendingLoadFuture; + @Getter private Future pendingLoadFuture; private final CloseableHttpClient httpClient; private final ExecutorService executorService; private volatile boolean loadBatchFirstRecord; private volatile boolean loading = false; private String label; - private long recordCount = 0; + @Getter private long recordCount = 0; public DorisStreamLoad( String hostPort, @@ -115,18 +118,6 @@ public DorisStreamLoad( loadBatchFirstRecord = true; } - public String getDb() { - return db; - } - - public String getHostPort() { - return hostPort; - } - - public Future getPendingLoadFuture() { - return pendingLoadFuture; - } - public void abortPreCommit(String labelSuffix, long chkID) throws Exception { long startChkID = chkID; log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); @@ -196,10 +187,6 @@ public void writeRecord(byte[] record) throws IOException { recordCount++; } - public long getRecordCount() { - return recordCount; - } - public String getLoadFailedMsg() { if (!loading) { return null; @@ -300,10 +287,9 @@ public void abortTransaction(long txnID) throws Exception { "Fail to abort transaction " + txnID + " with url " + abortUrlStr); } - ObjectMapper mapper = new ObjectMapper(); String loadResult = EntityUtils.toString(response.getEntity()); Map res = - mapper.readValue(loadResult, new TypeReference>() {}); + JsonUtils.parseObject(loadResult, new TypeReference>() {}); if (!LoadStatus.SUCCESS.equals(res.get("status"))) { if (ResponseUtil.isCommitted(res.get("msg"))) { throw new DorisConnectorException( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml index af85d92acef..7a3008adb3a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml @@ -49,5 +49,19 @@ ${mysql.version} test + + org.apache.seatunnel + connector-cdc-mysql + ${project.version} + test-jar + test + + + + org.testcontainers + mysql + ${testcontainer.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java index 9afa91d4e81..23a30984c5a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java @@ -17,16 +17,27 @@ package org.apache.seatunnel.e2e.connector.doris; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; import lombok.extern.slf4j.Slf4j; +import java.sql.Connection; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -34,11 +45,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.awaitility.Awaitility.await; + @Slf4j -@Disabled("we need resolve the issue of network between containers") +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not support cdc") public class DorisCDCSinkIT extends AbstractDorisIT { private static final String DATABASE = "test"; @@ -60,8 +77,54 @@ public class DorisCDCSinkIT extends AbstractDorisIT { + "\"replication_allocation\" = \"tag.location.default: 1\"" + ")"; + // mysql + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + private static final String MYSQL_DATABASE = "mysql_cdc"; + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table"; + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Doris-CDC/lib && cd /tmp/seatunnel/plugins/Doris-CDC/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE); + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(MYSQL_DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private String driverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + @BeforeAll public void init() { + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + inventoryDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); initializeJdbcTable(); } @@ -72,22 +135,52 @@ public void testDorisCDCSink(TestContainer container) throws Exception { Assertions.assertEquals(0, execResult.getExitCode()); String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE); - Set> actual = new HashSet<>(); - try (Statement sinkStatement = jdbcConnection.createStatement()) { - ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); - while (sinkResultSet.next()) { - List row = - Arrays.asList( - sinkResultSet.getLong("uuid"), - sinkResultSet.getString("name"), - sinkResultSet.getInt("score")); - actual.add(row); - } - } + Set> expected = - Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100)) + Stream.>of( + Arrays.asList(1L, "Alice", 95), Arrays.asList(2L, "Bob", 88)) .collect(Collectors.toSet()); - Assertions.assertIterableEquals(expected, actual); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Set> actual = new HashSet<>(); + try (Statement sinkStatement = jdbcConnection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sinkResultSet.next()) { + List row = + Arrays.asList( + sinkResultSet.getLong("uuid"), + sinkResultSet.getString("name"), + sinkResultSet.getInt("score")); + actual.add(row); + } + } + Assertions.assertIterableEquals(expected, actual); + }); + + executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE + " WHERE uuid = 1"); + + Set> expectedAfterDelete = + Stream.>of(Arrays.asList(2L, "Bob", 88)).collect(Collectors.toSet()); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Set> actual = new HashSet<>(); + try (Statement sinkStatement = jdbcConnection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sinkResultSet.next()) { + List row = + Arrays.asList( + sinkResultSet.getLong("uuid"), + sinkResultSet.getString("name"), + sinkResultSet.getInt("score")); + actual.add(row); + } + } + Assertions.assertIterableEquals(expectedAfterDelete, actual); + }); } private void initializeJdbcTable() { @@ -100,4 +193,20 @@ private void initializeJdbcTable() { throw new RuntimeException("Initializing table failed!", e); } } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + // Execute SQL + private void executeSql(String sql) { + try (Connection connection = getJdbcConnection()) { + connection.createStatement().execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql new file mode 100644 index 00000000000..638da2981b3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql @@ -0,0 +1,38 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE IF NOT EXISTS `mysql_cdc`; + +use mysql_cdc; +-- Create a mysql data source table +CREATE TABLE IF NOT EXISTS `mysql_cdc`.`mysql_cdc_e2e_source_table` ( + `uuid` BIGINT, + `name` VARCHAR(128), + `score` INT, + PRIMARY KEY (`uuid`) +) ENGINE=InnoDB; + + + +truncate table `mysql_cdc`.`mysql_cdc_e2e_source_table`; + +INSERT INTO `mysql_cdc`.`mysql_cdc_e2e_source_table` (uuid, name, score) VALUES +(1, 'Alice', 95), +(2, 'Bob', 88); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf new file mode 100644 index 00000000000..a390897885d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql new file mode 100644 index 00000000000..429061558ba --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql @@ -0,0 +1,28 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'mysqluser' - all privileges +-- 2) 'st_user_source' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw'; +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, DROP, LOCK TABLES ON *.* TO 'st_user_source'@'%'; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf index d4d4e69f9d6..70f4331f291 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf @@ -17,23 +17,23 @@ env { parallelism = 1 - job.mode = "BATCH" + job.mode = "STREAMING" } source { MySQL-CDC { parallelism = 1 - server-id = 5656 - username = "root" - password = "Bigdata2023@" - table-names = ["test.e2e_table_sink"] - base-url = "jdbc:mysql://119.3.230.145:56725/test" + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" } } sink { Doris { - fenodes = "10.16.10.14:8234" + fenodes = "doris_e2e:8030" username = root password = "" database = "test" @@ -43,8 +43,7 @@ sink { sink.enable-delete = "true" doris.config { format = "csv" - "column_separator" = "\\x01" - "line_delimiter" = "\\x01" + "column_separator" = "," } } } \ No newline at end of file