Skip to content

Commit

Permalink
[Feature][Zeta][REST-API]Add REST API To Submit Job (apache#5107)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhibinF authored and liunaijie committed Aug 12, 2023
1 parent 22a13eb commit cbe5b3f
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 86 deletions.
58 changes: 58 additions & 0 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,61 @@ network:

------------------------------------------------------------------------------------------

### Submit Job.

<details>
<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code> <code>(Returns jobId and jobName if job submitted successfully.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |

#### Body

```json
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
```

#### Responses

```json
{
"jobId": 733584788375666689,
"jobName": "rest_api_test"
}
```

</details>

------------------------------------------------------------------------------------------

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public static Config of(@NonNull Path filePath) {
return config;
}

public static Config of(@NonNull Map<String, Object> objectMap) {
log.info("Loading config file from objectMap");
Config config = ConfigFactory.parseMap(objectMap);
return ConfigShadeUtils.decryptConfig(config);
}

public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) {
log.info("With config adapter spi {}", configAdapter.getClass().getName());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;

Expand All @@ -37,6 +39,7 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import io.restassured.response.Response;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -131,6 +134,75 @@ public void testSystemMonitoringInformation() {
.statusCode(200);
}

@Test
public void testSubmitJob() {
String requestBody =
"{\n"
+ " \"env\": {\n"
+ " \"job.mode\": \"batch\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
+ " \"plugin_name\": \"FakeSource\",\n"
+ " \"result_table_name\": \"fake\",\n"
+ " \"row.num\": 100,\n"
+ " \"schema\": {\n"
+ " \"fields\": {\n"
+ " \"name\": \"string\",\n"
+ " \"age\": \"int\",\n"
+ " \"card\": \"int\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ],\n"
+ " \"transform\": [\n"
+ " ],\n"
+ " \"sink\": [\n"
+ " {\n"
+ " \"plugin_name\": \"Console\",\n"
+ " \"source_table_name\": [\"fake\"]\n"
+ " }\n"
+ " ]\n"
+ "}";
String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
// Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is
// false
Response response =
given().body(requestBody)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.SUBMIT_JOB_URL
+ "?"
+ parameters);

response.then().statusCode(200).body("jobName", equalTo("test"));
String jobId = response.getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,19 @@
package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class JobExecutionEnvironment {

private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);

private final boolean isStartWithSavePoint;

private final JobConfig jobConfig;

private final List<Action> actions = new ArrayList<>();

private final Set<URL> jarUrls = new HashSet<>();

private final List<URL> commonPluginJars = new ArrayList<>();
public class JobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

private final IdGenerator idGenerator;

private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;

private final JobClient jobClient;
Expand All @@ -78,35 +42,12 @@ public JobExecutionEnvironment(
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
boolean isStartWithSavePoint,
Long jobId) {
this.jobConfig = jobConfig;
super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.isStartWithSavePoint = isStartWithSavePoint;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

public JobExecutionEnvironment(
Expand All @@ -117,27 +58,12 @@ public JobExecutionEnvironment(
}

/** Search all jars in SEATUNNEL_HOME/plugins */
private Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
}
return Collections.emptySet();
}

private MultipleTableJobConfigParser getJobConfigParser() {
@Override
protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
}

private LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}

public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
Expand All @@ -150,11 +76,4 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException

return jobClient.createJobProxy(jobImmutableInformation);
}

private LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
return getLogicalDagGenerator().generate();
}
}
Loading

0 comments on commit cbe5b3f

Please sign in to comment.