Skip to content

Commit

Permalink
[feat][Zeta] add get job status by http
Browse files Browse the repository at this point in the history
  • Loading branch information
chaorongzhi committed Sep 1, 2023
1 parent 7795ca4 commit af167c0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RestConstant {
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";
public static final String CANCEL_JOB_URL = "/hazelcast/rest/maps/cancel-job";
public static final String GET_JOB_STATUS_URL = "/hazelcast/rest/maps/get-job-status";

public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
Expand All @@ -31,10 +32,13 @@
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;

import org.apache.commons.lang.StringUtils;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.Member;
Expand All @@ -55,11 +59,13 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static org.apache.seatunnel.engine.server.rest.RestConstant.CANCEL_JOB_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.GET_JOB_STATUS_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
Expand Down Expand Up @@ -99,6 +105,8 @@ public void handle(HttpGetCommand httpGetCommand) {
getSystemMonitoringInformation(httpGetCommand);
} else if (uri.startsWith(CANCEL_JOB_URL)) {
cancelJobById(httpGetCommand, uri);
} else if (uri.startsWith(GET_JOB_STATUS_URL)) {
getJobDetailStateById(httpGetCommand, uri);
} else {
original.handle(httpGetCommand);
}
Expand Down Expand Up @@ -196,22 +204,55 @@ private void cancelJobById(HttpGetCommand command, String uri) {
uri = StringUtil.stripTrailingSlash(uri);
Map<String, String> requestParams = new HashMap<>();
RestUtil.buildRequestParams(requestParams, uri);
Long jobId = Long.valueOf(Long.parseLong(requestParams.get("jobId")));
Boolean isStopWithSavePoint =
Boolean.valueOf(Boolean.parseBoolean(requestParams.get("isStopWithSavePoint")));
long jobId = Long.parseLong(requestParams.get("jobId"));
boolean isStopWithSavePoint =
Boolean.parseBoolean(requestParams.get("isStopWithSavePoint"));
CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService();
PassiveCompletableFuture<Void> voidPassiveCompletableFuture = null;
if (isStopWithSavePoint.booleanValue()) {
voidPassiveCompletableFuture = coordinatorService.savePoint(jobId.longValue());
if (isStopWithSavePoint) {
voidPassiveCompletableFuture = coordinatorService.savePoint(jobId);
} else {
voidPassiveCompletableFuture = coordinatorService.cancelJob(jobId.longValue());
voidPassiveCompletableFuture = coordinatorService.cancelJob(jobId);
}
voidPassiveCompletableFuture.join();
Map<String, String> rst = new HashMap<>();
rst.put("msg", "success");
prepareResponse(command, JsonUtil.toJsonObject(rst));
}

private void getJobDetailStateById(HttpGetCommand command, String uri) {
JsonObject respInfo = new JsonObject();
JsonObject jobDetailInfo = new JsonObject();
uri = StringUtil.stripTrailingSlash(uri);
Map<String, String> requestParams = new HashMap<>();
RestUtil.buildRequestParams(requestParams, uri);
if (StringUtils.isBlank(requestParams.get("jobId")))
throw new SeaTunnelEngineException("jobId is empty");
long jobId = Long.parseLong(requestParams.get("jobId"));
JobHistoryService.JobState jobDetailState =
getSeaTunnelServer()
.getCoordinatorService()
.getJobHistoryService()
.getJobDetailState(jobId);
if (Objects.isNull(jobDetailState))
throw new SeaTunnelEngineException(String.format("job(%s) does not exist", jobId));
String jobMetrics =
getSeaTunnelServer().getCoordinatorService().getJobMetrics(jobId).toJsonString();
jobDetailInfo.add("jobId", jobId);
jobDetailInfo.add("jobName", jobDetailState.getJobName());
jobDetailInfo.add("jobStatus", jobDetailState.getJobStatus().name());
jobDetailInfo.add("submitTime", jobDetailState.getSubmitTime());
jobDetailInfo.add(
"finishTime",
(jobDetailState.getFinishTime() == null) ? 0L : jobDetailState.getFinishTime());
jobDetailInfo.add("errorMessage", jobDetailState.getErrorMessage());
jobDetailInfo.add("jobMetrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
respInfo.add("status", "success");
respInfo.add("message", "successful get job detail state");
respInfo.add("data", jobDetailInfo);
prepareResponse(command, respInfo);
}

private Map<String, Long> getJobMetrics(String jobMetrics) {
Map<String, Long> metricsMap = new HashMap<>();
long sourceReadCount = 0L;
Expand Down

0 comments on commit af167c0

Please sign in to comment.