Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
gdliu3 committed Aug 6, 2024
1 parent 34a6b8e commit 4c86e0e
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.format.text.TextSerializationSchema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -93,13 +94,15 @@ public void open() throws IOException {}
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {

List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());
List<Object> fieldNames = new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
List<SeaTunnelDataType<?>> fieldTypes =
new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));

if (enableDelete) {
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
seaTunnelRowEnableDelete.setField(
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));

List<Object> newFields = new ArrayList<>(Arrays.asList(seaTunnelRow.getFields()));
newFields.add(parseDeleteSign(seaTunnelRow.getRowKind()));
seaTunnelRow = new SeaTunnelRow(newFields.toArray());
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.seatunnel.connectors.doris.sink.writer;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
Expand All @@ -31,9 +34,9 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand Down Expand Up @@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable {
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
private final String loadUrlStr;
private final String hostPort;
@Getter private final String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
private final String db;
@Getter private final String db;
private final String table;
private final boolean enable2PC;
private final boolean enableDelete;
private final Properties streamLoadProp;
private final RecordStream recordStream;
private Future<CloseableHttpResponse> pendingLoadFuture;
@Getter private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
private volatile boolean loading = false;
private String label;
private long recordCount = 0;
@Getter private long recordCount = 0;

public DorisStreamLoad(
String hostPort,
Expand Down Expand Up @@ -115,18 +118,6 @@ public DorisStreamLoad(
loadBatchFirstRecord = true;
}

public String getDb() {
return db;
}

public String getHostPort() {
return hostPort;
}

public Future<CloseableHttpResponse> getPendingLoadFuture() {
return pendingLoadFuture;
}

public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
long startChkID = chkID;
log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
Expand Down Expand Up @@ -196,10 +187,6 @@ public void writeRecord(byte[] record) throws IOException {
recordCount++;
}

public long getRecordCount() {
return recordCount;
}

public String getLoadFailedMsg() {
if (!loading) {
return null;
Expand Down Expand Up @@ -300,10 +287,9 @@ public void abortTransaction(long txnID) throws Exception {
"Fail to abort transaction " + txnID + " with url " + abortUrlStr);
}

ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
JsonUtils.parseObject(loadResult, new TypeReference<HashMap<String, String>>() {});
if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,19 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,45 @@

package org.apache.seatunnel.e2e.connector.doris;

import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;

@Slf4j
@Disabled("we need resolve the issue of network between containers")
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support cdc")
public class DorisCDCSinkIT extends AbstractDorisIT {

private static final String DATABASE = "test";
Expand All @@ -60,8 +77,54 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
+ "\"replication_allocation\" = \"tag.location.default: 1\""
+ ")";

// mysql
private static final String MYSQL_HOST = "mysql_cdc_e2e";
private static final String MYSQL_USER_NAME = "mysqluser";
private static final String MYSQL_USER_PASSWORD = "mysqlpw";
private static final String MYSQL_DATABASE = "mysql_cdc";
private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";

@TestContainerExtension
protected final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Doris-CDC/lib && cd /tmp/seatunnel/plugins/Doris-CDC/lib && wget "
+ driverUrl());
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
};

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(
MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE);

private static MySqlContainer createMySqlContainer(MySqlVersion version) {
return new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withDatabaseName(MYSQL_DATABASE)
.withUsername(MYSQL_USER_NAME)
.withPassword(MYSQL_USER_PASSWORD)
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
}

private String driverUrl() {
return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
}

@BeforeAll
public void init() {
log.info("The second stage: Starting Mysql containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
log.info("Mysql Containers are started");
inventoryDatabase.createAndInitialize();
log.info("Mysql ddl execution is complete");
initializeJdbcTable();
}

Expand All @@ -72,22 +135,52 @@ public void testDorisCDCSink(TestContainer container) throws Exception {
Assertions.assertEquals(0, execResult.getExitCode());

String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = jdbcConnection.createStatement()) {
ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
sinkResultSet.getLong("uuid"),
sinkResultSet.getString("name"),
sinkResultSet.getInt("score"));
actual.add(row);
}
}

Set<List<Object>> expected =
Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100))
Stream.<List<Object>>of(
Arrays.asList(1L, "Alice", 95), Arrays.asList(2L, "Bob", 88))
.collect(Collectors.toSet());
Assertions.assertIterableEquals(expected, actual);

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = jdbcConnection.createStatement()) {
ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
sinkResultSet.getLong("uuid"),
sinkResultSet.getString("name"),
sinkResultSet.getInt("score"));
actual.add(row);
}
}
Assertions.assertIterableEquals(expected, actual);
});

executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE + " WHERE uuid = 1");

Set<List<Object>> expectedAfterDelete =
Stream.<List<Object>>of(Arrays.asList(2L, "Bob", 88)).collect(Collectors.toSet());

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = jdbcConnection.createStatement()) {
ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
sinkResultSet.getLong("uuid"),
sinkResultSet.getString("name"),
sinkResultSet.getInt("score"));
actual.add(row);
}
}
Assertions.assertIterableEquals(expectedAfterDelete, actual);
});
}

private void initializeJdbcTable() {
Expand All @@ -100,4 +193,20 @@ private void initializeJdbcTable() {
throw new RuntimeException("Initializing table failed!", e);
}
}

private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
}

// Execute SQL
private void executeSql(String sql) {
try (Connection connection = getJdbcConnection()) {
connection.createStatement().execute(sql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE IF NOT EXISTS `mysql_cdc`;

use mysql_cdc;
-- Create a mysql data source table
CREATE TABLE IF NOT EXISTS `mysql_cdc`.`mysql_cdc_e2e_source_table` (
`uuid` BIGINT,
`name` VARCHAR(128),
`score` INT,
PRIMARY KEY (`uuid`)
) ENGINE=InnoDB;



truncate table `mysql_cdc`.`mysql_cdc_e2e_source_table`;

INSERT INTO `mysql_cdc`.`mysql_cdc_e2e_source_table` (uuid, name, score) VALUES
(1, 'Alice', 95),
(2, 'Bob', 88);
Loading

0 comments on commit 4c86e0e

Please sign in to comment.