Skip to content

Commit

Permalink
[Feature][connector-v2][mongodb] mongodb support cdc sink (apache#4833)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored and wunan1210 committed Jul 5, 2023
1 parent 9ffffb7 commit 00b2b18
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 258 deletions.
49 changes: 29 additions & 20 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ Key Features
------------

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

**Tips**

> 1.If you want to use CDC-written features, recommend enable the upsert-enable configuration.
Description
-----------
Expand All @@ -34,9 +38,9 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
Data Type Mapping
-----------------

The following table lists the field data type mapping from MongoDB BSON type to SeaTunnel data type.
The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

| SeaTunnel Data Type | MongoDB BSON Type |
| Seatunnel Data Type | MongoDB BSON Type |
|---------------------|-------------------|
| STRING | ObjectId |
| STRING | String |
Expand All @@ -62,23 +66,24 @@ The following table lists the field data type mapping from MongoDB BSON type to
Sink Options
------------

| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| upsert-key | List | No | - | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|---------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |

**Tips**

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
Expand Down Expand Up @@ -198,8 +203,8 @@ The necessity for using transactions can be greatly avoided by designing systems

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, SeaTunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.
If `primary-key` and `upsert-enable` is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

```bash
sink {
Expand All @@ -208,7 +213,7 @@ sink {
database = "test_db"
collection = "users"
upsert-enable = true
upsert-key = ["name","status"]
primary-key = ["name","status"]
schema = {
fields {
_id = string
Expand All @@ -222,11 +227,15 @@ sink {

## Changelog

### 2.2.0-beta 2022-09-26
### 2.2.0-beta

- Add MongoDB Source Connector

### 2.3.1-release

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))

### Next Version

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/seatunnel/pull/4620))
- [Feature]Mongodb support cdc sink([4833](https://github.com/apache/seatunnel/pull/4833))

1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released. (#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id. (#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset error. (#4379)
- [Connector-V2] [Mongodb] Mongodb support cdc sink. (#4833)
- [Connector-V2] [kafka] Fix the problem that the partition information can not be obtained when kafka is restored (#4764)
- [Connector-V2] [SFTP] Fix incorrect exception handling logic (#4720)
- [Connector-V2] [File] Fix read temp file (#4876)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ public class MongodbConfig {
.defaultValue(false)
.withDescription("Whether to write documents via upsert mode.");

public static final Option<List<String>> UPSERT_KEY =
Options.key("upsert-key")
public static final Option<List<String>> PRIMARY_KEY =
Options.key("primary-key")
.listType()
.noDefaultValue()
.withDescription(
"The primary keys for upsert. Only valid in upsert mode. Keys are in csv format for properties.");
"The primary keys for upsert/update. Keys are in csv format for properties.")
.withFallbackKeys("upsert-key");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,97 @@

package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;

import org.bson.BsonDocument;
import org.bson.conversions.Bson;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;

public class RowDataDocumentSerializer implements DocumentSerializer<SeaTunnelRow> {

private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;

private final Boolean isUpsertEnable;

private final Function<BsonDocument, BsonDocument> filterConditions;

private final Map<RowKind, WriteModelSupplier> writeModelSuppliers;

public RowDataDocumentSerializer(
RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter,
MongodbWriterOptions options,
Function<BsonDocument, BsonDocument> filterConditions) {
this.rowDataToBsonConverter = rowDataToBsonConverter;
this.isUpsertEnable = options.isUpsertEnable();
this.filterConditions = filterConditions;

writeModelSuppliers = createWriteModelSuppliers();
}

@Override
public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
if (isUpsertEnable) {
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
bsonDocument.remove("_id");
BsonDocument update = new BsonDocument("$set", bsonDocument);
return new UpdateOneModel<>(filter, update, new UpdateOptions().upsert(true));
} else {
return new InsertOneModel<>(bsonDocument);
WriteModelSupplier writeModelSupplier = writeModelSuppliers.get(row.getRowKind());
if (writeModelSupplier == null) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Unsupported message kind: " + row.getRowKind());
}
return writeModelSupplier.get(row);
}

private Map<RowKind, WriteModelSupplier> createWriteModelSuppliers() {
Map<RowKind, WriteModelSupplier> writeModelSuppliers = new HashMap<>();

WriteModelSupplier upsertSupplier =
row -> {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
bsonDocument.remove("_id");
BsonDocument update = new BsonDocument("$set", bsonDocument);
return new UpdateOneModel<>(filter, update, new UpdateOptions().upsert(true));
};

WriteModelSupplier updateSupplier =
row -> {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
bsonDocument.remove("_id");
BsonDocument update = new BsonDocument("$set", bsonDocument);
return new UpdateOneModel<>(filter, update);
};

WriteModelSupplier insertSupplier =
row -> {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
return new InsertOneModel<>(bsonDocument);
};

WriteModelSupplier deleteSupplier =
row -> {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
return new DeleteOneModel<>(filter);
};

writeModelSuppliers.put(RowKind.INSERT, isUpsertEnable ? upsertSupplier : insertSupplier);
writeModelSuppliers.put(
RowKind.UPDATE_AFTER, isUpsertEnable ? upsertSupplier : updateSupplier);
writeModelSuppliers.put(RowKind.DELETE, deleteSupplier);

return writeModelSuppliers;
}

public static Bson generateFilter(BsonDocument filterConditions) {
Expand All @@ -71,4 +118,8 @@ public static Bson generateFilter(BsonDocument filterConditions) {

return Filters.and(filters);
}

private interface WriteModelSupplier {
WriteModel<BsonDocument> get(SeaTunnelRow row);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public class MongoKeyExtractor implements SerializableFunction<BsonDocument, Bso

private static final long serialVersionUID = 1L;

private final String[] upsertKey;
private final String[] primaryKey;

public MongoKeyExtractor(MongodbWriterOptions options) {
upsertKey = options.getUpsertKey();
primaryKey = options.getPrimaryKey();
}

@Override
public BsonDocument apply(BsonDocument bsonDocument) {
return Arrays.stream(upsertKey)
return Arrays.stream(primaryKey)
.filter(bsonDocument::containsKey)
.collect(
Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

Expand Down Expand Up @@ -65,12 +66,20 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
builder.withBatchIntervalMs(
pluginConfig.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key()));
}
if (pluginConfig.hasPath(MongodbConfig.UPSERT_KEY.key())) {
builder.withUpsertKey(
if (pluginConfig.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
builder.withPrimaryKey(
pluginConfig
.getStringList(MongodbConfig.UPSERT_KEY.key())
.getStringList(MongodbConfig.PRIMARY_KEY.key())
.toArray(new String[0]));
}
List<String> fallbackKeys = MongodbConfig.PRIMARY_KEY.getFallbackKeys();
fallbackKeys.forEach(
key -> {
if (pluginConfig.hasPath(key)) {
builder.withPrimaryKey(
pluginConfig.getStringList(key).toArray(new String[0]));
}
});
if (pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
builder.withUpsertEnable(
pluginConfig.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public OptionRule optionRule() {
MongodbConfig.RETRY_MAX,
MongodbConfig.RETRY_INTERVAL,
MongodbConfig.UPSERT_ENABLE,
MongodbConfig.UPSERT_KEY)
MongodbConfig.PRIMARY_KEY)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
Expand Down Expand Up @@ -86,10 +87,12 @@ private void initOptions(MongodbWriterOptions options) {
}

@Override
public void write(SeaTunnelRow o) throws IOException {
bulkRequests.add(serializer.serializeToWriteModel(o));
if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
doBulkWrite();
public void write(SeaTunnelRow o) {
if (o.getRowKind() != RowKind.UPDATE_BEFORE) {
bulkRequests.add(serializer.serializeToWriteModel(o));
if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
doBulkWrite();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MongodbWriterOptions implements Serializable {

protected final boolean upsertEnable;

protected final String[] upsertKey;
protected final String[] primaryKey;

protected final int retryMax;

Expand All @@ -51,7 +51,7 @@ public MongodbWriterOptions(
int flushSize,
Long batchIntervalMs,
boolean upsertEnable,
String[] upsertKey,
String[] primaryKey,
int retryMax,
Long retryInterval) {
this.connectString = connectString;
Expand All @@ -60,7 +60,7 @@ public MongodbWriterOptions(
this.flushSize = flushSize;
this.batchIntervalMs = batchIntervalMs;
this.upsertEnable = upsertEnable;
this.upsertKey = upsertKey;
this.primaryKey = primaryKey;
this.retryMax = retryMax;
this.retryInterval = retryInterval;
}
Expand All @@ -83,7 +83,7 @@ public static class Builder {

protected boolean upsertEnable;

protected String[] upsertKey;
protected String[] primaryKey;

protected int retryMax;

Expand Down Expand Up @@ -119,8 +119,8 @@ public Builder withUpsertEnable(boolean upsertEnable) {
return this;
}

public Builder withUpsertKey(String[] upsertKey) {
this.upsertKey = upsertKey;
public Builder withPrimaryKey(String[] primaryKey) {
this.primaryKey = primaryKey;
return this;
}

Expand All @@ -142,7 +142,7 @@ public MongodbWriterOptions build() {
flushSize,
batchIntervalMs,
upsertEnable,
upsertKey,
primaryKey,
retryMax,
retryInterval);
}
Expand Down
Loading

0 comments on commit 00b2b18

Please sign in to comment.