diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java index a3282cc4a1e..5a4050d884d 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; @@ -33,6 +34,7 @@ import org.apache.seatunnel.core.starter.execution.TaskExecution; import org.apache.seatunnel.core.starter.flink.FlinkStarter; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -111,6 +113,12 @@ public void execute() throws TaskExecuteException { "Flink Execution Plan: {}", flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName()); + if (!flinkRuntimeEnvironment.isStreaming()) { + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .setRuntimeMode(RuntimeExecutionMode.BATCH); + log.info("Flink job Mode: {}", JobMode.BATCH); + } try { flinkRuntimeEnvironment .getStreamExecutionEnvironment() diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index a3897a526e9..6bcc5fe8939 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -71,13 +71,15 @@ public List> execute(List> upstreamDataStreams) } else { sourceFunction = new SeaTunnelParallelSource(internalSource); } + boolean bounded = + internalSource.getBoundedness() + == org.apache.seatunnel.api.source.Boundedness.BOUNDED; DataStreamSource sourceStream = addSource( executionEnvironment, sourceFunction, "SeaTunnel " + internalSource.getClass().getSimpleName(), - internalSource.getBoundedness() - == org.apache.seatunnel.api.source.Boundedness.BOUNDED); + bounded); Config pluginConfig = pluginConfigs.get(i); if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());