From f17b26e1a12b6b1f6c58a81d54a2c53ce9fc2389 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 8 Aug 2023 23:45:09 +0800 Subject: [PATCH 1/2] Remove scheduler in Dynamodb sink --- docs/en/connector-v2/sink/AmazonDynamoDB.md | 1 - .../config/AmazonDynamoDBSourceOptions.java | 4 --- .../sink/AmazonDynamoDBWriter.java | 7 ++++ .../sink/DynamoDbSinkClient.java | 34 ++----------------- .../common/sink/AbstractSinkWriter.java | 3 +- 5 files changed, 11 insertions(+), 38 deletions(-) diff --git a/docs/en/connector-v2/sink/AmazonDynamoDB.md b/docs/en/connector-v2/sink/AmazonDynamoDB.md index e8fe0b23afb..6e880fb4af4 100644 --- a/docs/en/connector-v2/sink/AmazonDynamoDB.md +++ b/docs/en/connector-v2/sink/AmazonDynamoDB.md @@ -20,7 +20,6 @@ Write data to Amazon DynamoDB | secret_access_key | string | yes | - | | table | string | yes | - | | batch_size | string | no | 25 | -| batch_interval_ms | string | no | 1000 | | common-options | | no | - | ### url [string] diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java index f92921ee140..54f955f540e 100644 --- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java @@ -43,7 +43,6 @@ public class AmazonDynamoDBSourceOptions implements Serializable { private Config schema; public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue(); - public int batchIntervalMs = AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue(); public AmazonDynamoDBSourceOptions(Config config) { this.url = config.getString(AmazonDynamoDBConfig.URL.key()); @@ -57,8 +56,5 @@ public AmazonDynamoDBSourceOptions(Config config) { if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) { this.batchSize = config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key()); } - if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) { - this.batchIntervalMs = config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key()); - } } } diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java index 016036cc841..d059bce7b57 100644 --- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import java.io.IOException; +import java.util.Optional; public class AmazonDynamoDBWriter extends AbstractSinkWriter { @@ -48,4 +49,10 @@ public void write(SeaTunnelRow element) throws IOException { public void close() throws IOException { dynamoDbSinkClient.close(); } + + @Override + public Optional prepareCommit() { + dynamoDbSinkClient.flush(); + return Optional.empty(); + } } diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java index d8acf33ebeb..e42f573dfb8 100644 --- a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java @@ -24,7 +24,6 @@ import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer; import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -40,15 +39,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; public class DynamoDbSinkClient { private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions; - private ScheduledExecutorService scheduler; - private ScheduledFuture scheduledFuture; private volatile boolean initialize; private volatile Exception flushException; private DynamoDbClient dynamoDbClient; @@ -62,7 +55,7 @@ public DynamoDbSinkClient( this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo); } - private void tryInit() throws IOException { + private void tryInit() { if (initialize) { return; } @@ -78,25 +71,6 @@ private void tryInit() throws IOException { amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey()))) .build(); - - scheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("DdynamoDb-sink-output-%s") - .build()); - scheduledFuture = - scheduler.scheduleAtFixedRate( - () -> { - try { - flush(); - } catch (IOException e) { - flushException = e; - } - }, - amazondynamodbSourceOptions.getBatchIntervalMs(), - amazondynamodbSourceOptions.getBatchIntervalMs(), - TimeUnit.MILLISECONDS); - initialize = true; } @@ -114,17 +88,13 @@ public synchronized void write(PutItemRequest putItemRequest) throws IOException } public synchronized void close() throws IOException { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - scheduler.shutdown(); - } if (dynamoDbClient != null) { flush(); dynamoDbClient.close(); } } - synchronized void flush() throws IOException { + synchronized void flush() { checkFlushException(); if (batchList.isEmpty()) { return; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java index 9c836fe8e39..9cdc9e2dd2a 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java @@ -19,12 +19,13 @@ import org.apache.seatunnel.api.sink.SinkWriter; +import java.io.IOException; import java.util.Optional; public abstract class AbstractSinkWriter implements SinkWriter { @Override - public Optional prepareCommit() { + public Optional prepareCommit() throws IOException { return Optional.empty(); } From 2fa94a48af8eba1456ceb862a93ff5e08c5d3bfa Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Wed, 9 Aug 2023 13:24:01 +0800 Subject: [PATCH 2/2] revert code --- .../connectors/seatunnel/common/sink/AbstractSinkWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java index 9cdc9e2dd2a..9c836fe8e39 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java @@ -19,13 +19,12 @@ import org.apache.seatunnel.api.sink.SinkWriter; -import java.io.IOException; import java.util.Optional; public abstract class AbstractSinkWriter implements SinkWriter { @Override - public Optional prepareCommit() throws IOException { + public Optional prepareCommit() { return Optional.empty(); }