Skip to content

Commit

Permalink
[Improve] Change class name for RestJobExecutionEnvironment impleme…
Browse files Browse the repository at this point in the history
…nt (#5671)
  • Loading branch information
Hisoka-X authored Oct 25, 2023
1 parent 75b814b commit 83082b2
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
JobExecutionEnvironment jobExecutionEnv;
ClientJobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
if (null != clientCommandArgs.getRestoreJobId()) {
jobExecutionEnv =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -230,7 +230,7 @@ public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedEx
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -325,7 +325,7 @@ public void testBatchJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -422,7 +422,7 @@ public void testStreamJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -540,7 +540,7 @@ public void testBatchJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -642,7 +642,7 @@ public void testStreamJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -810,7 +810,7 @@ public void testStreamJobRestoreInAllNodeDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();
Expand Down Expand Up @@ -1028,7 +1028,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testTwoPipelineBatchJobRunOkIn2Node()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -243,7 +243,7 @@ public void testTwoPipelineStreamJobRunOkIn2Node()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -341,7 +341,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -456,7 +456,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -586,7 +586,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -696,7 +696,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobResult;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testExecuteJob() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand All @@ -103,7 +103,7 @@ public void cancelJobTest() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> completableFuture =
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testExpiredJobWasDeleted() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
Expand Down Expand Up @@ -71,7 +71,7 @@ void beforeClass() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

clientJobProxy = jobExecutionEnv.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
Expand Down Expand Up @@ -48,15 +48,15 @@ public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
}

@Override
public JobExecutionEnvironment createExecutionContext(
public ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath, @NonNull JobConfig jobConfig) {
return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
}

@Override
public JobExecutionEnvironment restoreExecutionContext(
public ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath, @NonNull JobConfig jobConfig, @NonNull Long jobId) {
return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;

import lombok.NonNull;

public interface SeaTunnelClientInstance {

JobExecutionEnvironment createExecutionContext(
ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath, @NonNull JobConfig config);

JobExecutionEnvironment restoreExecutionContext(
ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath, @NonNull JobConfig config, @NonNull Long jobId);

JobClient createJobClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;

public class JobExecutionEnvironment extends AbstractJobEnvironment {
public class ClientJobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

Expand All @@ -36,7 +36,7 @@ public class JobExecutionEnvironment extends AbstractJobEnvironment {
private final JobClient jobClient;

/** If the JobId is not empty, it is used to restore job from savePoint */
public JobExecutionEnvironment(
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
Expand All @@ -50,7 +50,7 @@ public JobExecutionEnvironment(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
}

public JobExecutionEnvironment(
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
Expand Down
Loading

0 comments on commit 83082b2

Please sign in to comment.