From 83082b2be836c6352521a2dfef6b8e1beac05311 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 25 Oct 2023 11:38:13 +0800 Subject: [PATCH] [Improve] Change class name for `RestJobExecutionEnvironment` implement (#5671) --- .../command/ClientExecuteCommand.java | 4 ++-- .../engine/e2e/ClusterFaultToleranceIT.java | 18 +++++++++--------- .../ClusterFaultToleranceTwoPipelineIT.java | 14 +++++++------- .../seatunnel/engine/e2e/JobExecutionIT.java | 10 +++++----- .../apache/seatunnel/engine/e2e/RestApiIT.java | 4 ++-- .../seatunnel/engine/e2e/TextHeaderIT.java | 4 ++-- .../engine/client/SeaTunnelClient.java | 10 +++++----- .../engine/client/SeaTunnelClientInstance.java | 6 +++--- ...java => ClientJobExecutionEnvironment.java} | 6 +++--- .../engine/client/SeaTunnelClientTest.java | 14 +++++++------- ...v.java => RestJobExecutionEnvironment.java} | 4 ++-- .../rest/RestHttpPostCommandProcessor.java | 10 +++++----- 12 files changed, 52 insertions(+), 52 deletions(-) rename seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/{JobExecutionEnvironment.java => ClientJobExecutionEnvironment.java} (95%) rename seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/{JobImmutableInformationEnv.java => RestJobExecutionEnvironment.java} (96%) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 449b3f5238c..2b4e541e5fc 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -25,8 +25,8 @@ import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs; import org.apache.seatunnel.core.starter.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.JobMetricsRunner; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; @@ -123,7 +123,7 @@ public void execute() throws CommandExecuteException { Path configFile = FileUtils.getConfigPath(clientCommandArgs); checkConfigExist(configFile); JobConfig jobConfig = new JobConfig(); - JobExecutionEnvironment jobExecutionEnv; + ClientJobExecutionEnvironment jobExecutionEnv; jobConfig.setName(clientCommandArgs.getJobName()); if (null != clientCommandArgs.getRestoreJobId()) { jobExecutionEnv = diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java index 1b547be73b9..86dc52a92b4 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; @@ -108,7 +108,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -230,7 +230,7 @@ public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedEx ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -325,7 +325,7 @@ public void testBatchJobRestoreIn2NodeWorkerDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -422,7 +422,7 @@ public void testStreamJobRestoreIn2NodeWorkerDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -540,7 +540,7 @@ public void testBatchJobRestoreIn2NodeMasterDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -642,7 +642,7 @@ public void testStreamJobRestoreIn2NodeMasterDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -810,7 +810,7 @@ public void testStreamJobRestoreInAllNodeDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); Long jobId = clientJobProxy.getJobId(); @@ -1028,7 +1028,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); Long jobId = clientJobProxy.getJobId(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java index 3c677b45f3d..44087645ba1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java @@ -22,8 +22,8 @@ import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; @@ -115,7 +115,7 @@ public void testTwoPipelineBatchJobRunOkIn2Node() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -243,7 +243,7 @@ public void testTwoPipelineStreamJobRunOkIn2Node() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -341,7 +341,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -456,7 +456,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -586,7 +586,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -696,7 +696,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 4ecee663ae5..45b582874ef 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.core.job.JobResult; @@ -76,7 +76,7 @@ public void testExecuteJob() throws Exception { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -103,7 +103,7 @@ public void cancelJobTest() throws Exception { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -132,7 +132,7 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture completableFuture = @@ -177,7 +177,7 @@ public void testExpiredJobWasDeleted() throws Exception { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index d896ec17bf7..305ca7b3b27 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; @@ -71,7 +71,7 @@ void beforeClass() throws Exception { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(testClusterName); SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig); clientJobProxy = jobExecutionEnv.execute(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java index 7d60fadcde9..010d8a80ddc 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -22,8 +22,8 @@ import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; @@ -130,7 +130,7 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); engineClient = new SeaTunnelClient(clientConfig); - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(testResources.getRight(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 97f602b46d7..1f4e7410e1c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.engine.client; import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.JobClient; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -48,15 +48,15 @@ public SeaTunnelClient(@NonNull ClientConfig clientConfig) { } @Override - public JobExecutionEnvironment createExecutionContext( + public ClientJobExecutionEnvironment createExecutionContext( @NonNull String filePath, @NonNull JobConfig jobConfig) { - return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient); + return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient); } @Override - public JobExecutionEnvironment restoreExecutionContext( + public ClientJobExecutionEnvironment restoreExecutionContext( @NonNull String filePath, @NonNull JobConfig jobConfig, @NonNull Long jobId) { - return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId); + return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId); } @Override diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java index 17d2d26a8f2..73c8ccd5793 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java @@ -17,18 +17,18 @@ package org.apache.seatunnel.engine.client; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.JobClient; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.JobConfig; import lombok.NonNull; public interface SeaTunnelClientInstance { - JobExecutionEnvironment createExecutionContext( + ClientJobExecutionEnvironment createExecutionContext( @NonNull String filePath, @NonNull JobConfig config); - JobExecutionEnvironment restoreExecutionContext( + ClientJobExecutionEnvironment restoreExecutionContext( @NonNull String filePath, @NonNull JobConfig config, @NonNull Long jobId); JobClient createJobClient(); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java similarity index 95% rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java index 3f870c61216..5970aa97e16 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.concurrent.ExecutionException; -public class JobExecutionEnvironment extends AbstractJobEnvironment { +public class ClientJobExecutionEnvironment extends AbstractJobEnvironment { private final String jobFilePath; @@ -36,7 +36,7 @@ public class JobExecutionEnvironment extends AbstractJobEnvironment { private final JobClient jobClient; /** If the JobId is not empty, it is used to restore job from savePoint */ - public JobExecutionEnvironment( + public ClientJobExecutionEnvironment( JobConfig jobConfig, String jobFilePath, SeaTunnelHazelcastClient seaTunnelHazelcastClient, @@ -50,7 +50,7 @@ public JobExecutionEnvironment( new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId())); } - public JobExecutionEnvironment( + public ClientJobExecutionEnvironment( JobConfig jobConfig, String jobFilePath, SeaTunnelHazelcastClient seaTunnelHazelcastClient) { diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index ff39c4b2a5e..b30d3ec0de3 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -22,9 +22,9 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobClient; -import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; @@ -98,7 +98,7 @@ public void testExecuteJob() { SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture objectCompletableFuture = @@ -133,7 +133,7 @@ public void testGetJobState() { JobClient jobClient = seaTunnelClient.getJobClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture objectCompletableFuture = @@ -179,7 +179,7 @@ public void testGetJobMetrics() { JobClient jobClient = seaTunnelClient.getJobClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -277,7 +277,7 @@ public void testCancelJob() throws ExecutionException, InterruptedException { SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); JobClient jobClient = seaTunnelClient.getJobClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); @@ -315,7 +315,7 @@ public void testGetJobInfo() { JobClient jobClient = seaTunnelClient.getJobClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); @@ -363,7 +363,7 @@ public void testSavePointAndRestoreWithSavePoint() throws Exception { JobClient jobClient = seaTunnelClient.getJobClient(); try { - JobExecutionEnvironment jobExecutionEnv = + ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java similarity index 96% rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java index 4dd72e31cb8..7111e64710a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/RestJobExecutionEnvironment.java @@ -31,14 +31,14 @@ import java.util.ArrayList; -public class JobImmutableInformationEnv extends AbstractJobEnvironment { +public class RestJobExecutionEnvironment extends AbstractJobEnvironment { private final Config seaTunnelJobConfig; private final NodeEngineImpl nodeEngine; private final Long jobId; - public JobImmutableInformationEnv( + public RestJobExecutionEnvironment( JobConfig jobConfig, Config seaTunnelJobConfig, Node node, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index 66a9131f65b..b1d66a4e39d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -27,7 +27,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.server.CoordinatorService; import org.apache.seatunnel.engine.server.SeaTunnelServer; -import org.apache.seatunnel.engine.server.job.JobImmutableInformationEnv; +import org.apache.seatunnel.engine.server.job.RestJobExecutionEnvironment; import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor; import org.apache.seatunnel.engine.server.utils.RestUtil; @@ -96,15 +96,15 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) Config config = RestUtil.buildConfig(requestHandle(httpPostCommand)); JobConfig jobConfig = new JobConfig(); jobConfig.setName(requestParams.get(RestConstant.JOB_NAME)); - JobImmutableInformationEnv jobImmutableInformationEnv = - new JobImmutableInformationEnv( + RestJobExecutionEnvironment restJobExecutionEnvironment = + new RestJobExecutionEnvironment( jobConfig, config, textCommandService.getNode(), Boolean.parseBoolean( requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)), Long.parseLong(requestParams.get(RestConstant.JOB_ID))); - JobImmutableInformation jobImmutableInformation = jobImmutableInformationEnv.build(); + JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build(); CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); Data data = textCommandService @@ -117,7 +117,7 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) Long.parseLong(jobConfig.getJobContext().getJobId()), data); voidPassiveCompletableFuture.join(); - Long jobId = jobImmutableInformationEnv.getJobId(); + Long jobId = restJobExecutionEnvironment.getJobId(); this.prepareResponse( httpPostCommand, new JsonObject()