diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 22b247ad76b..e9b5e53bc75 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -23,6 +23,7 @@ public class RestConstant { public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String CANCEL_JOB_URL = "/hazelcast/rest/maps/cancel-job"; + public static final String GET_JOB_STATUS_URL = "/hazelcast/rest/maps/get-job-status"; public static final String SYSTEM_MONITORING_INFORMATION = "/hazelcast/rest/maps/system-monitoring-information"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 18cd0e93823..3f41eb4cdc1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; @@ -31,10 +32,13 @@ import org.apache.seatunnel.engine.server.CoordinatorService; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor; +import org.apache.seatunnel.engine.server.master.JobHistoryService; import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import org.apache.seatunnel.engine.server.utils.RestUtil; +import org.apache.commons.lang.StringUtils; + import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Cluster; import com.hazelcast.cluster.Member; @@ -55,11 +59,13 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.CANCEL_JOB_URL; +import static org.apache.seatunnel.engine.server.rest.RestConstant.GET_JOB_STATUS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION; @@ -99,6 +105,8 @@ public void handle(HttpGetCommand httpGetCommand) { getSystemMonitoringInformation(httpGetCommand); } else if (uri.startsWith(CANCEL_JOB_URL)) { cancelJobById(httpGetCommand, uri); + } else if (uri.startsWith(GET_JOB_STATUS_URL)) { + getJobDetailStateById(httpGetCommand, uri); } else { original.handle(httpGetCommand); } @@ -196,15 +204,15 @@ private void cancelJobById(HttpGetCommand command, String uri) { uri = StringUtil.stripTrailingSlash(uri); Map requestParams = new HashMap<>(); RestUtil.buildRequestParams(requestParams, uri); - Long jobId = Long.valueOf(Long.parseLong(requestParams.get("jobId"))); - Boolean isStopWithSavePoint = - Boolean.valueOf(Boolean.parseBoolean(requestParams.get("isStopWithSavePoint"))); + long jobId = Long.parseLong(requestParams.get("jobId")); + boolean isStopWithSavePoint = + Boolean.parseBoolean(requestParams.get("isStopWithSavePoint")); CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); PassiveCompletableFuture voidPassiveCompletableFuture = null; - if (isStopWithSavePoint.booleanValue()) { - voidPassiveCompletableFuture = coordinatorService.savePoint(jobId.longValue()); + if (isStopWithSavePoint) { + voidPassiveCompletableFuture = coordinatorService.savePoint(jobId); } else { - voidPassiveCompletableFuture = coordinatorService.cancelJob(jobId.longValue()); + voidPassiveCompletableFuture = coordinatorService.cancelJob(jobId); } voidPassiveCompletableFuture.join(); Map rst = new HashMap<>(); @@ -212,6 +220,39 @@ private void cancelJobById(HttpGetCommand command, String uri) { prepareResponse(command, JsonUtil.toJsonObject(rst)); } + private void getJobDetailStateById(HttpGetCommand command, String uri) { + JsonObject respInfo = new JsonObject(); + JsonObject jobDetailInfo = new JsonObject(); + uri = StringUtil.stripTrailingSlash(uri); + Map requestParams = new HashMap<>(); + RestUtil.buildRequestParams(requestParams, uri); + if (StringUtils.isBlank(requestParams.get("jobId"))) + throw new SeaTunnelEngineException("jobId is empty"); + long jobId = Long.parseLong(requestParams.get("jobId")); + JobHistoryService.JobState jobDetailState = + getSeaTunnelServer() + .getCoordinatorService() + .getJobHistoryService() + .getJobDetailState(jobId); + if (Objects.isNull(jobDetailState)) + throw new SeaTunnelEngineException(String.format("job(%s) does not exist", jobId)); + String jobMetrics = + getSeaTunnelServer().getCoordinatorService().getJobMetrics(jobId).toJsonString(); + jobDetailInfo.add("jobId", jobId); + jobDetailInfo.add("jobName", jobDetailState.getJobName()); + jobDetailInfo.add("jobStatus", jobDetailState.getJobStatus().name()); + jobDetailInfo.add("submitTime", jobDetailState.getSubmitTime()); + jobDetailInfo.add( + "finishTime", + (jobDetailState.getFinishTime() == null) ? 0L : jobDetailState.getFinishTime()); + jobDetailInfo.add("errorMessage", jobDetailState.getErrorMessage()); + jobDetailInfo.add("jobMetrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + respInfo.add("status", "success"); + respInfo.add("message", "successful get job detail state"); + respInfo.add("data", jobDetailInfo); + prepareResponse(command, respInfo); + } + private Map getJobMetrics(String jobMetrics) { Map metricsMap = new HashMap<>(); long sourceReadCount = 0L;