From 3c20fe7a8aa091ffa3d5d8dedbdcfa0dd43a56aa Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Fri, 1 Jul 2022 09:53:10 +0800 Subject: [PATCH] [Bug] [Fix-10672] Dependent task retry bug (#10707) * fix 10517 * fix dep warn bug Co-authored-by: JinyLeeChina --- .../server/master/runner/WorkflowExecuteThread.java | 6 +++++- .../dolphinscheduler/server/utils/DependentExecute.java | 7 ++++++- .../dolphinscheduler/service/process/ProcessService.java | 7 +++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 76e44e43e088..39e903bdc2ad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -348,13 +348,17 @@ private boolean taskTimeout(StateEvent stateEvent) { return true; } TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); - if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy && !taskInstance.getState().typeIsFinished()) { + if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) && !taskInstance.getState().typeIsFinished()) { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); taskProcessor.action(TaskAction.TIMEOUT); if (taskInstance.isDependTask()) { TaskInstance task = processService.findTaskInstanceById(taskInstance.getId()); taskFinished(task); } + if (TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); + } } else { ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 2ae48f873d51..77af8699f552 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -196,7 +196,12 @@ private DependResult getDependTaskResult(long taskCode, DateInterval dateInterva TaskInstance taskInstance = processService.findLastTaskInstanceInterval(taskCode, dateInterval); DependResult result; if (taskInstance == null) { - logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}", taskCode); + TaskDefinition taskDefinition = processService.findTaskDefinitionByCode(taskCode); + if (taskDefinition == null) { + logger.error("Cannot find the task definition, something error, taskCode: {}", taskCode); + } else { + logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName()); + } result = DependResult.FAILED; } else { logger.info("The running task, taskId:{}, taskCode:{}, taskName:{}", taskInstance.getId(), taskInstance.getTaskCode(), taskInstance.getName()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 43a12560ae4e..ae71812b5724 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2489,6 +2489,13 @@ public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersio return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); } + /** + * find task definition by code + */ + public TaskDefinition findTaskDefinitionByCode(long taskCode) { + return taskDefinitionMapper.queryByCode(taskCode); + } + /** * find process task relation list by process */