From cbe5b3fa975e7049bba22da695f6c3c72777b7f9 Mon Sep 17 00:00:00 2001 From: fang <56808812+zhibinF@users.noreply.github.com> Date: Tue, 8 Aug 2023 11:56:15 +0800 Subject: [PATCH] [Feature][Zeta][REST-API]Add REST API To Submit Job (#5107) --- docs/en/seatunnel-engine/rest-api.md | 58 ++++++++ .../core/starter/utils/ConfigBuilder.java | 6 + .../seatunnel/engine/e2e/RestApiIT.java | 72 ++++++++++ .../client/job/JobExecutionEnvironment.java | 91 +----------- .../core/job/AbstractJobEnvironment.java | 114 +++++++++++++++ .../parse/MultipleTableJobConfigParser.java | 16 +++ .../engine/server/NodeExtension.java | 2 + .../job/JobImmutableInformationEnv.java | 80 +++++++++++ .../engine/server/rest/RestConstant.java | 1 + .../rest/RestHttpPostCommandProcessor.java | 135 ++++++++++++++++++ .../engine/server/utils/RestUtil.java | 65 +++++++++ 11 files changed, 554 insertions(+), 86 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 2edec3496adb..2f44421a3d60 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -180,3 +180,61 @@ network: ------------------------------------------------------------------------------------------ +### Submit Job. + +
+POST /hazelcast/rest/maps/submit-job (Returns jobId and jobName if job submitted successfully.) + +#### Parameters + +> | name | type | data type | description | +> |----------------------|----------|-----------|-----------------------------------| +> | jobId | optional | string | job id | +> | jobName | optional | string | job name | +> | isStartWithSavePoint | optional | string | if job is started with save point | + +#### Body + +```json +{ + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 100, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] +} +``` + +#### Responses + +```json +{ + "jobId": 733584788375666689, + "jobName": "rest_api_test" +} +``` + +
+ +------------------------------------------------------------------------------------------ + diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java index ed66b550a046..ad063acac8a6 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java @@ -69,6 +69,12 @@ public static Config of(@NonNull Path filePath) { return config; } + public static Config of(@NonNull Map objectMap) { + log.info("Loading config file from objectMap"); + Config config = ConfigFactory.parseMap(objectMap); + return ConfigShadeUtils.decryptConfig(config); + } + public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) { log.info("With config adapter spi {}", configAdapter.getClass().getName()); try { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 5f4e97ac8d50..d38d1c732f19 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -22,10 +22,12 @@ import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.rest.RestConstant; @@ -37,6 +39,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import io.restassured.response.Response; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @@ -131,6 +134,75 @@ public void testSystemMonitoringInformation() { .statusCode(200); } + @Test + public void testSubmitJob() { + String requestBody = + "{\n" + + " \"env\": {\n" + + " \"job.mode\": \"batch\"\n" + + " },\n" + + " \"source\": [\n" + + " {\n" + + " \"plugin_name\": \"FakeSource\",\n" + + " \"result_table_name\": \"fake\",\n" + + " \"row.num\": 100,\n" + + " \"schema\": {\n" + + " \"fields\": {\n" + + " \"name\": \"string\",\n" + + " \"age\": \"int\",\n" + + " \"card\": \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"transform\": [\n" + + " ],\n" + + " \"sink\": [\n" + + " {\n" + + " \"plugin_name\": \"Console\",\n" + + " \"source_table_name\": [\"fake\"]\n" + + " }\n" + + " ]\n" + + "}"; + String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false"; + // Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is + // false + Response response = + given().body(requestBody) + .post( + HOST + + hazelcastInstance + .getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + + response.then().statusCode(200).body("jobName", equalTo("test")); + String jobId = response.getBody().jsonPath().getString("jobId"); + SeaTunnelServer seaTunnelServer = + (SeaTunnelServer) + hazelcastInstance + .node + .getNodeExtension() + .createExtensionServices() + .get(Constant.SEATUNNEL_SERVICE_NAME); + JobStatus jobStatus = + seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); + Assertions.assertEquals(JobStatus.RUNNING, jobStatus); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); + } + @AfterAll static void afterClass() { if (hazelcastInstance != null) { diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java index bf3169e4c803..3f870c612160 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java @@ -18,55 +18,19 @@ package org.apache.seatunnel.engine.client.job; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.env.EnvCommonOptions; -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; -import org.apache.seatunnel.engine.common.utils.IdGenerator; -import org.apache.seatunnel.engine.core.dag.actions.Action; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; -import org.apache.commons.lang3.tuple.ImmutablePair; - -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -public class JobExecutionEnvironment { - - private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class); - - private final boolean isStartWithSavePoint; - - private final JobConfig jobConfig; - - private final List actions = new ArrayList<>(); - - private final Set jarUrls = new HashSet<>(); - private final List commonPluginJars = new ArrayList<>(); +public class JobExecutionEnvironment extends AbstractJobEnvironment { private final String jobFilePath; - private final IdGenerator idGenerator; - private final SeaTunnelHazelcastClient seaTunnelHazelcastClient; private final JobClient jobClient; @@ -78,35 +42,12 @@ public JobExecutionEnvironment( SeaTunnelHazelcastClient seaTunnelHazelcastClient, boolean isStartWithSavePoint, Long jobId) { - this.jobConfig = jobConfig; + super(jobConfig, isStartWithSavePoint); this.jobFilePath = jobFilePath; - this.idGenerator = new IdGenerator(); this.seaTunnelHazelcastClient = seaTunnelHazelcastClient; this.jobClient = new JobClient(seaTunnelHazelcastClient); - this.isStartWithSavePoint = isStartWithSavePoint; this.jobConfig.setJobContext( new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId())); - this.commonPluginJars.addAll(searchPluginJars()); - this.commonPluginJars.addAll( - new ArrayList<>( - Common.getThirdPartyJars( - jobConfig - .getEnvOptions() - .getOrDefault(EnvCommonOptions.JARS.key(), "") - .toString()) - .stream() - .map(Path::toUri) - .map( - uri -> { - try { - return uri.toURL(); - } catch (MalformedURLException e) { - throw new SeaTunnelEngineException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toList()))); - LOGGER.info("add common jar in plugins :" + commonPluginJars); } public JobExecutionEnvironment( @@ -117,27 +58,12 @@ public JobExecutionEnvironment( } /** Search all jars in SEATUNNEL_HOME/plugins */ - private Set searchPluginJars() { - try { - if (Files.exists(Common.pluginRootDir())) { - return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); - } - } catch (IOException | SeaTunnelEngineException e) { - LOGGER.warning( - String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); - } - return Collections.emptySet(); - } - - private MultipleTableJobConfigParser getJobConfigParser() { + @Override + protected MultipleTableJobConfigParser getJobConfigParser() { return new MultipleTableJobConfigParser( jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint); } - private LogicalDagGenerator getLogicalDagGenerator() { - return new LogicalDagGenerator(actions, jobConfig, idGenerator); - } - public ClientJobProxy execute() throws ExecutionException, InterruptedException { JobImmutableInformation jobImmutableInformation = new JobImmutableInformation( @@ -150,11 +76,4 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException return jobClient.createJobProxy(jobImmutableInformation); } - - private LogicalDag getLogicalDag() { - ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); - actions.addAll(immutablePair.getLeft()); - jarUrls.addAll(immutablePair.getRight()); - return getLogicalDagGenerator().generate(); - } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java new file mode 100644 index 000000000000..3509903c0883 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.job; + +import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.IdGenerator; +import org.apache.seatunnel.engine.core.dag.actions.Action; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; + +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class AbstractJobEnvironment { + protected static ILogger LOGGER = null; + + protected final boolean isStartWithSavePoint; + + protected final List actions = new ArrayList<>(); + protected final Set jarUrls = new HashSet<>(); + + protected final JobConfig jobConfig; + + protected final IdGenerator idGenerator; + + protected final List commonPluginJars = new ArrayList<>(); + + public AbstractJobEnvironment(JobConfig jobConfig, boolean isStartWithSavePoint) { + LOGGER = Logger.getLogger(getClass().getName()); + this.jobConfig = jobConfig; + this.isStartWithSavePoint = isStartWithSavePoint; + this.idGenerator = new IdGenerator(); + this.commonPluginJars.addAll(searchPluginJars()); + this.commonPluginJars.addAll( + new ArrayList<>( + Common.getThirdPartyJars( + jobConfig + .getEnvOptions() + .getOrDefault(EnvCommonOptions.JARS.key(), "") + .toString()) + .stream() + .map(Path::toUri) + .map( + uri -> { + try { + return uri.toURL(); + } catch (MalformedURLException e) { + throw new SeaTunnelEngineException( + "the uri of jar illegal:" + uri, e); + } + }) + .collect(Collectors.toList()))); + LOGGER.info("add common jar in plugins :" + commonPluginJars); + } + + protected Set searchPluginJars() { + try { + if (Files.exists(Common.pluginRootDir())) { + return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); + } + } catch (IOException | SeaTunnelEngineException e) { + LOGGER.warning( + String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); + } + return Collections.emptySet(); + } + + protected abstract MultipleTableJobConfigParser getJobConfigParser(); + + protected LogicalDagGenerator getLogicalDagGenerator() { + return new LogicalDagGenerator(actions, jobConfig, idGenerator); + } + + protected LogicalDag getLogicalDag() { + ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); + actions.addAll(immutablePair.getLeft()); + jarUrls.addAll(immutablePair.getRight()); + return getLogicalDagGenerator().generate(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 86c0f3c94f59..ee2505286f82 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -130,6 +130,22 @@ public MultipleTableJobConfigParser( new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); } + public MultipleTableJobConfigParser( + Config seaTunnelJobConfig, + IdGenerator idGenerator, + JobConfig jobConfig, + List commonPluginJars, + boolean isStartWithSavePoint) { + this.idGenerator = idGenerator; + this.jobConfig = jobConfig; + this.commonPluginJars = commonPluginJars; + this.isStartWithSavePoint = isStartWithSavePoint; + this.seaTunnelJobConfig = seaTunnelJobConfig; + this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + this.fallbackParser = + new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); + } + public ImmutablePair, Set> parse() { List sourceConfigs = TypesafeConfigUtils.getConfigList( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java index d4137955c8bf..37e00cffab2d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor; import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor; import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor; +import org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor; import com.hazelcast.cluster.ClusterState; import com.hazelcast.instance.impl.DefaultNodeExtension; @@ -79,6 +80,7 @@ public TextCommandService createTextCommandService() { register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this)); register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this)); register(HTTP_GET, new RestHttpGetCommandProcessor(this)); + register(HTTP_POST, new RestHttpPostCommandProcessor(this)); } }; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java new file mode 100644 index 000000000000..4dd72e31cb8c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.job; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; + +import com.hazelcast.instance.impl.Node; +import com.hazelcast.spi.impl.NodeEngineImpl; + +import java.util.ArrayList; + +public class JobImmutableInformationEnv extends AbstractJobEnvironment { + private final Config seaTunnelJobConfig; + + private final NodeEngineImpl nodeEngine; + + private final Long jobId; + + public JobImmutableInformationEnv( + JobConfig jobConfig, + Config seaTunnelJobConfig, + Node node, + boolean isStartWithSavePoint, + Long jobId) { + super(jobConfig, isStartWithSavePoint); + this.seaTunnelJobConfig = seaTunnelJobConfig; + this.nodeEngine = node.getNodeEngine(); + this.jobConfig.setJobContext( + new JobContext( + isStartWithSavePoint + ? jobId + : nodeEngine + .getHazelcastInstance() + .getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME) + .newId())); + this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId()); + } + + public Long getJobId() { + return jobId; + } + + @Override + protected MultipleTableJobConfigParser getJobConfigParser() { + return new MultipleTableJobConfigParser( + seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint); + } + + public JobImmutableInformation build() { + return new JobImmutableInformation( + Long.parseLong(jobConfig.getJobContext().getJobId()), + jobConfig.getName(), + isStartWithSavePoint, + nodeEngine.getSerializationService().toData(getLogicalDag()), + jobConfig, + new ArrayList<>(jarUrls)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 0a5d8437be36..7776d592b8f6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -21,6 +21,7 @@ public class RestConstant { public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs"; public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; + public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String SYSTEM_MONITORING_INFORMATION = "/hazelcast/rest/maps/system-monitoring-information"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java new file mode 100644 index 000000000000..e0edd9320320 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.CoordinatorService; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.job.JobImmutableInformationEnv; +import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor; +import org.apache.seatunnel.engine.server.utils.RestUtil; + +import com.hazelcast.internal.ascii.TextCommandService; +import com.hazelcast.internal.ascii.rest.HttpCommandProcessor; +import com.hazelcast.internal.ascii.rest.HttpPostCommand; +import com.hazelcast.internal.json.JsonObject; +import com.hazelcast.internal.serialization.Data; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400; +import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; +import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL; + +public class RestHttpPostCommandProcessor extends HttpCommandProcessor { + private final Log4j2HttpPostCommandProcessor original; + + public RestHttpPostCommandProcessor(TextCommandService textCommandService) { + this(textCommandService, new Log4j2HttpPostCommandProcessor(textCommandService)); + } + + protected RestHttpPostCommandProcessor( + TextCommandService textCommandService, + Log4j2HttpPostCommandProcessor log4j2HttpPostCommandProcessor) { + super( + textCommandService, + textCommandService.getNode().getLogger(Log4j2HttpPostCommandProcessor.class)); + this.original = log4j2HttpPostCommandProcessor; + } + + @Override + public void handle(HttpPostCommand httpPostCommand) { + String uri = httpPostCommand.getURI(); + try { + if (uri.startsWith(SUBMIT_JOB_URL)) { + handleSubmitJob(httpPostCommand, uri); + } else { + original.handle(httpPostCommand); + } + } catch (IllegalArgumentException e) { + prepareResponse(SC_400, httpPostCommand, exceptionResponse(e)); + } catch (Throwable e) { + logger.warning("An error occurred while handling request " + httpPostCommand, e); + prepareResponse(SC_500, httpPostCommand, exceptionResponse(e)); + } + + this.textCommandService.sendResponse(httpPostCommand); + } + + private SeaTunnelServer getSeaTunnelServer() { + Map extensionServices = + this.textCommandService.getNode().getNodeExtension().createExtensionServices(); + return (SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME); + } + + private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) + throws IllegalArgumentException { + Map requestParams = new HashMap<>(); + RestUtil.buildRequestParams(requestParams, uri); + byte[] requestBody = httpPostCommand.getData(); + if (requestBody.length == 0) { + throw new IllegalArgumentException("Request body is empty."); + } + JsonNode requestBodyJsonNode; + try { + requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid JSON format in request body."); + } + Config config = RestUtil.buildConfig(requestBodyJsonNode); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(requestParams.get("jobName")); + JobImmutableInformationEnv jobImmutableInformationEnv = + new JobImmutableInformationEnv( + jobConfig, + config, + textCommandService.getNode(), + Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")), + Long.parseLong(requestParams.get("jobId"))); + JobImmutableInformation jobImmutableInformation = jobImmutableInformationEnv.build(); + CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); + Data data = + textCommandService + .getNode() + .nodeEngine + .getSerializationService() + .toData(jobImmutableInformation); + PassiveCompletableFuture voidPassiveCompletableFuture = + coordinatorService.submitJob( + Long.parseLong(jobConfig.getJobContext().getJobId()), data); + voidPassiveCompletableFuture.join(); + + Long jobId = jobImmutableInformationEnv.getJobId(); + this.prepareResponse( + httpPostCommand, + new JsonObject().add("jobId", jobId).add("jobName", requestParams.get("jobName"))); + } + + @Override + public void handleRejection(HttpPostCommand httpPostCommand) { + handle(httpPostCommand); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java new file mode 100644 index 000000000000..d3761366d095 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.utils; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.core.starter.utils.ConfigBuilder; + +import com.hazelcast.internal.util.StringUtil; + +import java.io.IOException; +import java.util.Map; + +public class RestUtil { + private RestUtil() {} + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static JsonNode convertByteToJsonNode(byte[] byteData) throws IOException { + return objectMapper.readTree(byteData); + } + + public static void buildRequestParams(Map requestParams, String uri) { + requestParams.put("jobId", null); + requestParams.put("jobName", Constants.LOGO); + requestParams.put("isStartWithSavePoint", String.valueOf(false)); + uri = StringUtil.stripTrailingSlash(uri); + if (!uri.contains("?")) { + return; + } + int indexEnd = uri.indexOf('?'); + try { + for (String s : uri.substring(indexEnd + 1).split("&")) { + String[] param = s.split("="); + requestParams.put(param[0], param[1]); + } + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Invalid Params format in Params."); + } + } + + public static Config buildConfig(JsonNode jsonNode) { + Map objectMap = JsonUtils.toMap(jsonNode); + return ConfigBuilder.of(objectMap); + } +}