diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 6a189c045c6..c9b552cd9b2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.exception.JobException; +import org.apache.seatunnel.engine.common.exception.JobNotFoundException; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -525,7 +526,7 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation(); JobMaster runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId()); if (runningJobMaster == null) { - throw new JobException( + throw new JobNotFoundException( String.format("Job %s not running", taskGroupLocation.getJobId())); } runningJobMaster.updateTaskExecutionState(taskExecutionState); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 0b95baded64..a98ac54a504 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.server.ThreadShareMode; +import org.apache.seatunnel.engine.common.exception.JobNotFoundException; import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; @@ -336,17 +337,31 @@ public PassiveCompletableFuture deployLocalTask( logger.severe(ExceptionUtils.getMessage(t)); resultFuture.completeExceptionally(t); } - resultFuture.whenComplete( + resultFuture.whenCompleteAsync( withTryCatch( logger, (r, s) -> { + if (s != null) { + logger.severe( + String.format( + "Task %s complete with error %s", + taskGroup.getTaskGroupLocation(), + ExceptionUtils.getMessage(s))); + } + if (r == null) { + r = + new TaskExecutionState( + taskGroup.getTaskGroupLocation(), + ExecutionState.FAILED, + s); + } logger.info( String.format( "Task %s complete with state %s", - r != null ? r.getTaskGroupLocation() : "null", - r != null ? r.getExecutionState() : "null")); + r.getTaskGroupLocation(), r.getExecutionState())); notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), r); - })); + }), + executorService); return new PassiveCompletableFuture<>(resultFuture); } @@ -370,16 +385,24 @@ private void notifyTaskStatusToMaster( notifyStateSuccess = true; } catch (InterruptedException e) { logger.severe("send notify task status failed", e); + } catch (JobNotFoundException e) { + logger.warning("send notify task status failed because can't find job", e); + notifyStateSuccess = true; } catch (ExecutionException e) { - logger.warning(ExceptionUtils.getMessage(e)); - logger.warning( - String.format( - "notify the job of the task(%s) status failed, retry in %s millis", - taskGroupLocation, sleepTime)); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException ex) { - logger.severe(e); + if (e.getCause() instanceof JobNotFoundException) { + logger.warning("send notify task status failed because can't find job", e); + notifyStateSuccess = true; + } else { + logger.warning(ExceptionUtils.getMessage(e)); + logger.warning( + String.format( + "notify the job of the task(%s) status failed, retry in %s millis", + taskGroupLocation, sleepTime)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ex) { + logger.severe(e); + } } } }