Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][connector-v2][mongodb] mongodb support cdc sink #4833

Merged
merged 6 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ Key Features
------------

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update Changelog & release-note


**Tips**

> 1.If you want to use CDC-written features, please enable the upsert-enable configuration.

Description
-----------
Expand Down Expand Up @@ -222,11 +226,15 @@ sink {

## Changelog

### 2.2.0-beta 2022-09-26
### 2.2.0-beta

- Add MongoDB Source Connector

### Next Version
### 2.3.1-release

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))

### Next Version

- [Feature]Mongodb support cdc sink([4833](https://github.com/apache/seatunnel/pull/4833))

2 changes: 2 additions & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released. (#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id. (#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset error. (#4379)
- [Connector-V2] [Mongodb] Mongodb support cdc sink. (#4833)


### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;

import org.bson.BsonDocument;
import org.bson.conversions.Bson;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
Expand All @@ -33,6 +35,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;

public class RowDataDocumentSerializer implements DocumentSerializer<SeaTunnelRow> {

private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
Expand All @@ -52,6 +56,20 @@ public RowDataDocumentSerializer(

@Override
public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
return upsert(row);
case UPDATE_BEFORE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore UPDATE_BEFORE if upsert is enabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If upsert is not enabled, update_before needs to as delete and update_after needs to as insert

case DELETE:
return delete(row);
default:
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unsupported message kind: " + row.getRowKind());
}
}

private WriteModel<BsonDocument> upsert(SeaTunnelRow row) {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
if (isUpsertEnable) {
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
Expand All @@ -63,6 +81,12 @@ public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
}
}

private WriteModel<BsonDocument> delete(SeaTunnelRow row) {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
return new DeleteOneModel<>(filter);
}

public static Bson generateFilter(BsonDocument filterConditions) {
List<Bson> filters =
filterConditions.entrySet().stream()
Expand Down
2 changes: 1 addition & 1 deletion seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
<redshift.version>2.1.0.9</redshift.version>
<snowflake.version>3.13.29</snowflake.version>

<!-- Imap storage dependency package -->
<!-- Imap storage dependency package -->
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
<name>SeaTunnel : E2E : Connector V2 : Mongodb</name>

<dependencies>

<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-mongodb</artifactId>
Expand All @@ -38,6 +36,13 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.v2.mongodb;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;

import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;

@Slf4j
public abstract class AbstractMongodbIT extends TestSuiteBase implements TestResource {

protected static final Random RANDOM = new Random();

protected static final List<Document> TEST_MATCH_DATASET = generateTestDataSet(5);

protected static final List<Document> TEST_SPLIT_DATASET = generateTestDataSet(10);

protected static final String MONGODB_IMAGE = "mongo:latest";

protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";

protected static final int MONGODB_PORT = 27017;

protected static final String MONGODB_DATABASE = "test_db";

protected static final String MONGODB_MATCH_TABLE = "test_match_op_db";

protected static final String MONGODB_SPLIT_TABLE = "test_split_op_db";

protected static final String MONGODB_MATCH_RESULT_TABLE = "test_match_op_result_db";

protected static final String MONGODB_SPLIT_RESULT_TABLE = "test_split_op_result_db";

protected static final String MONGODB_SINK_TABLE = "test_source_sink_table";

protected static final String MONGODB_UPDATE_TABLE = "test_update_table";

protected static final String MONGODB_FLAT_TABLE = "test_flat_table";

protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table";

protected GenericContainer<?> mongodbContainer;

protected MongoClient client;

public void initConnection() {
String host = mongodbContainer.getContainerIpAddress();
int port = mongodbContainer.getFirstMappedPort();
String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
client = MongoClients.create(url);
}

protected void initSourceData() {
MongoCollection<Document> sourceMatchTable =
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);

sourceMatchTable.deleteMany(new Document());
sourceMatchTable.insertMany(TEST_MATCH_DATASET);

MongoCollection<Document> sourceSplitTable =
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);

sourceSplitTable.deleteMany(new Document());
sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
}

protected void clearDate(String table) {
client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
}

public static List<Document> generateTestDataSet(int count) {
List<Document> dataSet = new ArrayList<>();

for (int i = 0; i < count; i++) {
dataSet.add(
new Document(
"c_map",
new Document("OQBqH", randomString())
.append("rkvlO", randomString())
.append("pCMEX", randomString())
.append("DAgdj", randomString())
.append("dsJag", randomString()))
.append(
"c_array",
Arrays.asList(
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt()))
.append("c_string", randomString())
.append("c_boolean", RANDOM.nextBoolean())
.append("c_int", i)
.append("c_bigint", RANDOM.nextLong())
.append("c_double", RANDOM.nextDouble() * Double.MAX_VALUE)
.append(
"c_row",
new Document(
"c_map",
new Document("OQBqH", randomString())
.append("rkvlO", randomString())
.append("pCMEX", randomString())
.append("DAgdj", randomString())
.append("dsJag", randomString()))
.append(
"c_array",
Arrays.asList(
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt(),
RANDOM.nextInt()))
.append("c_string", randomString())
.append("c_boolean", RANDOM.nextBoolean())
.append("c_int", RANDOM.nextInt())
.append("c_bigint", RANDOM.nextLong())
.append(
"c_double",
RANDOM.nextDouble() * Double.MAX_VALUE)));
}
return dataSet;
}

protected static String randomString() {
int length = RANDOM.nextInt(10) + 1;
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
char c = (char) (RANDOM.nextInt(26) + 'a');
sb.append(c);
}
return sb.toString();
}

protected List<Document> readMongodbData(String collection) {
MongoCollection<Document> sinkTable =
client.getDatabase(MONGODB_DATABASE).getCollection(collection);
MongoCursor<Document> cursor = sinkTable.find().sort(Sorts.ascending("c_int")).cursor();
List<Document> documents = new ArrayList<>();
while (cursor.hasNext()) {
documents.add(cursor.next());
}
return documents;
}

@BeforeAll
@Override
public void startUp() {
DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
mongodbContainer =
new GenericContainer<>(imageName)
.withNetwork(NETWORK)
.withNetworkAliases(MONGODB_CONTAINER_HOST)
.withExposedPorts(MONGODB_PORT)
.waitingFor(
new HttpWaitStrategy()
.forPort(MONGODB_PORT)
.forStatusCodeMatching(
response ->
response == HTTP_OK
|| response == HTTP_UNAUTHORIZED)
.withStartupTimeout(Duration.ofMinutes(2)))
.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
// For local test use
// mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
Startables.deepStart(Stream.of(mongodbContainer)).join();
log.info("Mongodb container started");

Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.initSourceData();
}

@AfterAll
@Override
public void tearDown() {
if (client != null) {
client.close();
}
if (mongodbContainer != null) {
mongodbContainer.close();
}
}
}
Loading