Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta][REST-API]Add REST API To Submit Job #5107

Merged
merged 4 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,61 @@ network:

------------------------------------------------------------------------------------------

### Submit Job.

<details>
<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code> <code>(Returns jobId and jobName if job submitted successfully.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |

#### Body

```json
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
```

#### Responses

```json
{
"jobId": 733584788375666689,
"jobName": "rest_api_test"
}
```

</details>

------------------------------------------------------------------------------------------

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public static Config of(@NonNull Path filePath) {
return config;
}

public static Config of(@NonNull Map<String, Object> objectMap) {
log.info("Loading config file from objectMap");
Config config = ConfigFactory.parseMap(objectMap);
return ConfigShadeUtils.decryptConfig(config);
}

public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) {
log.info("With config adapter spi {}", configAdapter.getClass().getName());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;

Expand All @@ -37,6 +39,7 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import io.restassured.response.Response;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -131,6 +134,75 @@ public void testSystemMonitoringInformation() {
.statusCode(200);
}

@Test
public void testSubmitJob() {
String requestBody =
"{\n"
+ " \"env\": {\n"
+ " \"job.mode\": \"batch\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
+ " \"plugin_name\": \"FakeSource\",\n"
+ " \"result_table_name\": \"fake\",\n"
+ " \"row.num\": 100,\n"
+ " \"schema\": {\n"
+ " \"fields\": {\n"
+ " \"name\": \"string\",\n"
+ " \"age\": \"int\",\n"
+ " \"card\": \"int\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ],\n"
+ " \"transform\": [\n"
+ " ],\n"
+ " \"sink\": [\n"
+ " {\n"
+ " \"plugin_name\": \"Console\",\n"
+ " \"source_table_name\": [\"fake\"]\n"
+ " }\n"
+ " ]\n"
+ "}";
String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
// Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is
// false
Response response =
given().body(requestBody)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.SUBMIT_JOB_URL
+ "?"
+ parameters);

response.then().statusCode(200).body("jobName", equalTo("test"));
String jobId = response.getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,19 @@
package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class JobExecutionEnvironment {

private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);

private final boolean isStartWithSavePoint;

private final JobConfig jobConfig;

private final List<Action> actions = new ArrayList<>();

private final Set<URL> jarUrls = new HashSet<>();

private final List<URL> commonPluginJars = new ArrayList<>();
public class JobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

private final IdGenerator idGenerator;

private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;

private final JobClient jobClient;
Expand All @@ -78,35 +42,12 @@ public JobExecutionEnvironment(
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
boolean isStartWithSavePoint,
Long jobId) {
this.jobConfig = jobConfig;
super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.isStartWithSavePoint = isStartWithSavePoint;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

public JobExecutionEnvironment(
Expand All @@ -117,27 +58,12 @@ public JobExecutionEnvironment(
}

/** Search all jars in SEATUNNEL_HOME/plugins */
private Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
}
return Collections.emptySet();
}

private MultipleTableJobConfigParser getJobConfigParser() {
@Override
protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
}

private LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}

public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
Expand All @@ -150,11 +76,4 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException

return jobClient.createJobProxy(jobImmutableInformation);
}

private LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
return getLogicalDagGenerator().generate();
}
}
Loading
Loading