Skip to content

Commit

Permalink
Fix the review problem
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed Jun 1, 2023
1 parent 60fe820 commit 1adb461
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Sink Options

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ public class MongodbConfig {
.listType()
.noDefaultValue()
.withDescription(
"The primary keys for upsert/update. Keys are in csv format for properties.");
"The primary keys for upsert/update. Keys are in csv format for properties.")
.withFallbackKeys("upsert-key");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MongoKeyExtractor implements SerializableFunction<BsonDocument, Bso
private final String[] upsertKey;

public MongoKeyExtractor(MongodbWriterOptions options) {
upsertKey = options.getUpsertKey();
upsertKey = options.getPrimaryKey();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

Expand Down Expand Up @@ -66,11 +67,19 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
pluginConfig.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key()));
}
if (pluginConfig.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
builder.withUpsertKey(
builder.withPrimaryKey(
pluginConfig
.getStringList(MongodbConfig.PRIMARY_KEY.key())
.toArray(new String[0]));
}
List<String> fallbackKeys = MongodbConfig.PRIMARY_KEY.getFallbackKeys();
fallbackKeys.forEach(
key -> {
if (pluginConfig.hasPath(key)) {
builder.withPrimaryKey(
pluginConfig.getStringList(key).toArray(new String[0]));
}
});
if (pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
builder.withUpsertEnable(
pluginConfig.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MongodbWriterOptions implements Serializable {

protected final boolean upsertEnable;

protected final String[] upsertKey;
protected final String[] primaryKey;

protected final int retryMax;

Expand All @@ -51,7 +51,7 @@ public MongodbWriterOptions(
int flushSize,
Long batchIntervalMs,
boolean upsertEnable,
String[] upsertKey,
String[] primaryKey,
int retryMax,
Long retryInterval) {
this.connectString = connectString;
Expand All @@ -60,7 +60,7 @@ public MongodbWriterOptions(
this.flushSize = flushSize;
this.batchIntervalMs = batchIntervalMs;
this.upsertEnable = upsertEnable;
this.upsertKey = upsertKey;
this.primaryKey = primaryKey;
this.retryMax = retryMax;
this.retryInterval = retryInterval;
}
Expand All @@ -83,7 +83,7 @@ public static class Builder {

protected boolean upsertEnable;

protected String[] upsertKey;
protected String[] primaryKey;

protected int retryMax;

Expand Down Expand Up @@ -119,8 +119,8 @@ public Builder withUpsertEnable(boolean upsertEnable) {
return this;
}

public Builder withUpsertKey(String[] upsertKey) {
this.upsertKey = upsertKey;
public Builder withPrimaryKey(String[] upsertKey) {
this.primaryKey = primaryKey;
return this;
}

Expand All @@ -142,7 +142,7 @@ public MongodbWriterOptions build() {
flushSize,
batchIntervalMs,
upsertEnable,
upsertKey,
primaryKey,
retryMax,
retryInterval);
}
Expand Down

0 comments on commit 1adb461

Please sign in to comment.