From cc01a52e89b794e141c3b2caed25cf5bb2752557 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 25 May 2023 10:08:37 +0800 Subject: [PATCH 1/3] [Bug][flink-runtime][connectors-v2] flink registry running mode changed after conversion hive to hive is failed #4809 --- .../file/source/split/FileSourceSplitEnumerator.java | 2 ++ .../core/starter/flink/execution/FlinkExecution.java | 8 ++++++++ .../starter/flink/execution/SourceExecuteProcessor.java | 8 ++++++-- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java index 09b11af5c7b..478c816fa7c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java @@ -123,8 +123,10 @@ public int currentUnassignedSplitSize() { @Override public void registerReader(int subtaskId) { + Set readers = context.registeredReaders(); pendingSplit = getFileSplit(); assignSplit(subtaskId); + readers.forEach(context::signalNoMoreSplits); } @Override diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java index a3282cc4a1e..5a4050d884d 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; @@ -33,6 +34,7 @@ import org.apache.seatunnel.core.starter.execution.TaskExecution; import org.apache.seatunnel.core.starter.flink.FlinkStarter; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -111,6 +113,12 @@ public void execute() throws TaskExecuteException { "Flink Execution Plan: {}", flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName()); + if (!flinkRuntimeEnvironment.isStreaming()) { + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .setRuntimeMode(RuntimeExecutionMode.BATCH); + log.info("Flink job Mode: {}", JobMode.BATCH); + } try { flinkRuntimeEnvironment .getStreamExecutionEnvironment() diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index a3897a526e9..f8ba1e0a2d6 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -71,13 +71,17 @@ public List> execute(List> upstreamDataStreams) } else { sourceFunction = new SeaTunnelParallelSource(internalSource); } + boolean bounded = + internalSource.getBoundedness() + == org.apache.seatunnel.api.source.Boundedness.BOUNDED + ? true + : false; DataStreamSource sourceStream = addSource( executionEnvironment, sourceFunction, "SeaTunnel " + internalSource.getClass().getSimpleName(), - internalSource.getBoundedness() - == org.apache.seatunnel.api.source.Boundedness.BOUNDED); + bounded); Config pluginConfig = pluginConfigs.get(i); if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key()); From 65797f5e2c6ec4b87306e83ad31ffffa647c0503 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Sat, 27 May 2023 13:47:10 +0800 Subject: [PATCH 2/3] [Improve][conf-files][format]Format the.conf file in the same style --- .../seatunnel/file/source/split/FileSourceSplitEnumerator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java index 478c816fa7c..09b11af5c7b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java @@ -123,10 +123,8 @@ public int currentUnassignedSplitSize() { @Override public void registerReader(int subtaskId) { - Set readers = context.registeredReaders(); pendingSplit = getFileSplit(); assignSplit(subtaskId); - readers.forEach(context::signalNoMoreSplits); } @Override From 48360205ce3923374031e1797eaa5e48a72123e4 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 5 Jun 2023 09:33:33 +0800 Subject: [PATCH 3/3] [Bug][flink-runtime][connectors-v2] Flink register table Environment The running mode is set tojob.mode #4826 --- .../core/starter/flink/execution/SourceExecuteProcessor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index f8ba1e0a2d6..6bcc5fe8939 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -73,9 +73,7 @@ public List> execute(List> upstreamDataStreams) } boolean bounded = internalSource.getBoundedness() - == org.apache.seatunnel.api.source.Boundedness.BOUNDED - ? true - : false; + == org.apache.seatunnel.api.source.Boundedness.BOUNDED; DataStreamSource sourceStream = addSource( executionEnvironment,