Skip to content

Commit

Permalink
[Feature][connector-v2][mongodb] mongodb support cdc sink (apache#4833)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored and EricJoy2048 committed Jul 11, 2023
1 parent 923081d commit 2fdf75f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 23 deletions.
49 changes: 29 additions & 20 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ Key Features
------------

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

**Tips**

> 1.If you want to use CDC-written features, recommend enable the upsert-enable configuration.
Description
-----------
Expand All @@ -34,9 +38,9 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
Data Type Mapping
-----------------

The following table lists the field data type mapping from MongoDB BSON type to SeaTunnel data type.
The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

| SeaTunnel Data Type | MongoDB BSON Type |
| Seatunnel Data Type | MongoDB BSON Type |
|---------------------|-------------------|
| STRING | ObjectId |
| STRING | String |
Expand All @@ -62,23 +66,24 @@ The following table lists the field data type mapping from MongoDB BSON type to
Sink Options
------------

| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| upsert-key | List | No | - | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|---------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |

**Tips**

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
Expand Down Expand Up @@ -198,8 +203,8 @@ The necessity for using transactions can be greatly avoided by designing systems

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, SeaTunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.
If `primary-key` and `upsert-enable` is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

```bash
sink {
Expand All @@ -208,7 +213,7 @@ sink {
database = "test_db"
collection = "users"
upsert-enable = true
upsert-key = ["name","status"]
primary-key = ["name","status"]
schema = {
fields {
_id = string
Expand All @@ -222,11 +227,15 @@ sink {

## Changelog

### 2.2.0-beta 2022-09-26
### 2.2.0-beta

- Add MongoDB Source Connector

### 2.3.1-release

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))

### Next Version

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/seatunnel/pull/4620))
- [Feature]Mongodb support cdc sink([4833](https://github.com/apache/seatunnel/pull/4833))

1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released. (#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id. (#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset error. (#4379)
- [Connector-V2] [Mongodb] Mongodb support cdc sink. (#4833)
- [Connector-V2] [kafka] Fix the problem that the partition information can not be obtained when kafka is restored (#4764)
- [Connector-V2] [SFTP] Fix incorrect exception handling logic (#4720)
- [Connector-V2] [File] Fix read temp file (#4876)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class RowDataDocumentSerializer implements DocumentSerializer<SeaTunnelRow> {

private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
private final boolean isUpsertEnable;
private final Boolean isUpsertEnable;
private final Function<BsonDocument, BsonDocument> filterConditions;

private final Map<RowKind, WriteModelSupplier> writeModelSuppliers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
Expand Down Expand Up @@ -109,7 +110,8 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new MongodbWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(seaTunnelRowType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
<name>SeaTunnel : E2E : Connector V2 : Mongodb</name>

<dependencies>
<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-mongodb</artifactId>
Expand All @@ -39,5 +38,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

0 comments on commit 2fdf75f

Please sign in to comment.