Skip to content

Commit

Permalink
[Fix-14149][master] task finished so not dispatch (#14161)
Browse files Browse the repository at this point in the history
  • Loading branch information
eye-gu authored Aug 31, 2023
1 parent 33525a1 commit 7ae4fb3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
Expand Down Expand Up @@ -122,6 +123,7 @@

import org.springframework.beans.BeanUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

Expand Down Expand Up @@ -983,7 +985,7 @@ private boolean executeTask(TaskInstance taskInstance) {
}
}
// 4. submit to dispatch queue
taskExecuteRunnable.dispatch();
tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);

stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, taskInstance);
return true;
Expand All @@ -996,6 +998,35 @@ private boolean executeTask(TaskInstance taskInstance) {
}
}

/**
* Sometimes (such as pause), if the task instance status has already been finished,
* there is no need to dispatch it
*/
@VisibleForTesting
void tryToDispatchTaskInstance(TaskInstance taskInstance, TaskExecuteRunnable taskExecuteRunnable) {
if (!taskInstance.getState().isFinished()) {
taskExecuteRunnable.dispatch();
} else {
if (workflowExecuteContext.getWorkflowInstance().isBlocked()) {
TaskStateEvent processBlockEvent = TaskStateEvent.builder()
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
.taskInstanceId(taskInstance.getId())
.status(taskInstance.getState())
.type(StateEventType.PROCESS_BLOCKED)
.build();
this.stateEvents.add(processBlockEvent);
}

TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder()
.processInstanceId(workflowExecuteContext.getWorkflowInstance().getId())
.taskInstanceId(taskInstance.getId())
.status(taskInstance.getState())
.type(StateEventType.TASK_STATE_CHANGE)
.build();
this.stateEvents.add(taskStateChangeEvent);
}
}

/**
* find task instance in db.
* in case submit more than one same name task in the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.command.CommandService;
Expand Down Expand Up @@ -371,4 +373,20 @@ private List<Schedule> oneSchedulerList() {
return schedulerList;
}

@Test
void testTryToDispatchTaskInstance() {
// task instance already finished, not dispatch
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.PAUSE);
Mockito.when(processInstance.isBlocked()).thenReturn(true);
TaskExecuteRunnable taskExecuteRunnable = Mockito.mock(TaskExecuteRunnable.class);
workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
Mockito.verify(taskExecuteRunnable, Mockito.never()).dispatch();

// submit success should dispatch
taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable);
Mockito.verify(taskExecuteRunnable).dispatch();
}
}

0 comments on commit 7ae4fb3

Please sign in to comment.