Skip to content

Commit

Permalink
Rewrite ci logic
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed May 26, 2023
1 parent 7d60a51 commit 6b6e22d
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 316 deletions.
12 changes: 10 additions & 2 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ Key Features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

**Tips**

> 1.If you want to use CDC-written features, please enable the upsert-enable configuration.
Description
-----------

Expand Down Expand Up @@ -222,11 +226,15 @@ sink {

## Changelog

### 2.2.0-beta 2022-09-26
### 2.2.0-beta

- Add MongoDB Source Connector

### Next Version
### 2.3.1-release

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))

### Next Version

- [Feature]Mongodb support cdc sink([4833](https://github.com/apache/seatunnel/pull/4833))

2 changes: 2 additions & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released. (#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id. (#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset error. (#4379)
- [Connector-V2] [Mongodb] Mongodb support cdc sink. (#4833)


### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,9 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@

package org.apache.seatunnel.e2e.connector.v2.mongodb;

import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;

import org.bson.Document;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerLoggerFactory;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
Expand All @@ -35,29 +30,14 @@
import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Random;

@Slf4j
public abstract class AbstractMongodbIT extends TestSuiteBase implements TestResource {

protected static final String MYSQL_HOST = "mysql_cdc_e2e";

protected static final String MYSQL_USER_NAME = "st_user";

protected static final String MYSQL_USER_PASSWORD = "seatunnel";

protected static final String MYSQL_DATABASE = "mysql_cdc";

protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);

protected static final Random RANDOM = new Random();

protected static final List<Document> TEST_MATCH_DATASET = generateTestDataSet(5);
Expand Down Expand Up @@ -88,16 +68,11 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes

protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table";

protected static final String SOURCE_SQL = "select * from products";

protected final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");

protected GenericContainer<?> mongodbContainer;

protected MongoClient client;

protected void initConnection() {
public void initConnection() {
String host = mongodbContainer.getContainerIpAddress();
int port = mongodbContainer.getFirstMappedPort();
String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
Expand All @@ -122,24 +97,7 @@ protected void clearDate(String table) {
client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
}

protected static MySqlContainer createMySqlContainer(MySqlVersion version) {
MySqlContainer mySqlContainer =
new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withDatabaseName(MYSQL_DATABASE)
.withUsername(MYSQL_USER_NAME)
.withPassword(MYSQL_USER_PASSWORD)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger("mysql-docker-image")));

return mySqlContainer;
}

protected static List<Document> generateTestDataSet(int count) {
public static List<Document> generateTestDataSet(int count) {
List<Document> dataSet = new ArrayList<>();

for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -212,53 +170,4 @@ protected List<Document> readMongodbData(String collection) {
}
return documents;
}

protected List<LinkedHashMap<String, Object>> querySql(String sql) {
try (Connection connection = getJdbcConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
List<LinkedHashMap<String, Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
LinkedHashMap<String, Object> row = new LinkedHashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = resultSet.getMetaData().getColumnName(i);
Object columnValue = resultSet.getObject(i);
row.put(columnName, columnValue);
}
result.add(row);
}
return result;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
}

protected void upsertDeleteSourceTable() {
executeSql(
"INSERT INTO mysql_cdc.products (name,description,weight)\n"
+ "VALUES ('car battery','12V car battery',31)");

executeSql(
"INSERT INTO mysql_cdc.products (name,description,weight)\n"
+ "VALUES ('rocks','box of assorted rocks',35)");

executeSql("DELETE FROM mysql_cdc.products where weight = 35");

executeSql("UPDATE mysql_cdc.products SET name = 'monster' where weight = 35");
}

private void executeSql(String sql) {
try (Connection connection = getJdbcConnection()) {
connection.createStatement().execute(sql);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
Expand All @@ -35,69 +37,47 @@

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static org.awaitility.Awaitility.await;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support cdc")
type = {EngineType.SPARK},
disabledReason = "Spark engine will lose the row kind of record")
@Slf4j
public class MongodbCDCIT extends AbstractMongodbIT {

@TestTemplate
public void testMongodbCDCSink(TestContainer container) {
CompletableFuture<Void> executeJobFuture =
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/cdcIT/mysqlcdc_to_mongodb.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
List<LinkedHashMap<String, Object>> expected = querySql(SOURCE_SQL);
List<Document> actual = readMongodbData(MONGODB_CDC_RESULT_TABLE);

List<LinkedHashMap<String, Object>> actualMapped =
actual.stream()
.map(LinkedHashMap::new)
.peek(map -> map.remove("_id"))
.collect(Collectors.toList());

Assertions.assertIterableEquals(expected, actualMapped);
});

upsertDeleteSourceTable();
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
List<LinkedHashMap<String, Object>> expected = querySql(SOURCE_SQL);
List<Document> actual = readMongodbData(MONGODB_CDC_RESULT_TABLE);

List<LinkedHashMap<String, Object>> actualMapped =
actual.stream()
.map(LinkedHashMap::new)
.peek(map -> map.remove("_id"))
.collect(Collectors.toList());

Assertions.assertIterableEquals(expected, actualMapped);
});
public void testMongodbCDCSink(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult queryResult =
container.executeJob("/cdcIT/fake_cdc_sink_mongodb.conf");
Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr());
Assertions.assertIterableEquals(
Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100))
.collect(Collectors.toList()),
readMongodbData(MONGODB_CDC_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(Collectors.toCollection(ArrayList::new)))
.collect(Collectors.toList()));
clearDate(MONGODB_CDC_RESULT_TABLE);
}

@BeforeAll
Expand All @@ -119,7 +99,7 @@ public void startUp() {
.withStartupTimeout(Duration.ofMinutes(2)))
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
// Used for local testing
// For local test use
// mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
Startables.deepStart(Stream.of(mongodbContainer)).join();
log.info("Mongodb container started");
Expand All @@ -131,14 +111,9 @@ public void startUp() {
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.initSourceData();

log.info("The second stage: Starting Mysql containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
log.info("Mysql Containers are started");
inventoryDatabase.createAndInitialize();
log.info("Mysql ddl execution is complete");
}

@AfterAll
@Override
public void tearDown() {
if (client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -174,7 +175,7 @@ public void startUp() {
.withStartupTimeout(Duration.ofMinutes(2)))
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
// Used for local testing
// For local test use
// mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
Startables.deepStart(Stream.of(mongodbContainer)).join();
log.info("Mongodb container started");
Expand All @@ -188,6 +189,7 @@ public void startUp() {
this.initSourceData();
}

@AfterAll
@Override
public void tearDown() {
if (client != null) {
Expand Down
Loading

0 comments on commit 6b6e22d

Please sign in to comment.