From db62ce0e470ab8b7ae20a24c135e9e26203b897b Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 4 Aug 2023 23:43:57 +0800 Subject: [PATCH] Fix serial mode will cause NPE in Workflow bootstrap (#14703) --- .../runner/MasterSchedulerBootstrap.java | 9 ++++++++- .../runner/WorkflowExecuteContextFactory.java | 20 ++++++++++--------- .../WorkflowExecuteRunnableFactory.java | 16 +++++++-------- .../service/process/ProcessService.java | 3 +++ .../service/process/ProcessServiceImpl.java | 6 ++++-- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 0f86cadfe82d..0313e2b4a5e5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.List; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -132,8 +133,14 @@ public void run() { commands.parallelStream() .forEach(command -> { try { - WorkflowExecuteRunnable workflowExecuteRunnable = + Optional workflowExecuteRunnableOptional = workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command); + if (!workflowExecuteRunnableOptional.isPresent()) { + log.warn( + "The command execute success, will not trigger a WorkflowExecuteRunnable, this workflowInstance might be in serial mode"); + return; + } + WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableOptional.get(); ProcessInstance processInstance = workflowExecuteRunnable .getWorkflowExecuteContext().getWorkflowInstance(); if (processInstanceExecCacheManager.contains(processInstance.getId())) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java index c30f791e5e24..0578176e5e69 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java @@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.Optional; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -50,21 +52,22 @@ public class WorkflowExecuteContextFactory { @Autowired private MasterConfig masterConfig; - public IWorkflowExecuteContext createWorkflowExecuteRunnableContext(Command command) throws Exception { - ProcessInstance workflowInstance = createWorkflowInstance(command); + public Optional createWorkflowExecuteRunnableContext(Command command) throws Exception { + Optional workflowInstanceOptional = createWorkflowInstance(command); + if (!workflowInstanceOptional.isPresent()) { + return Optional.empty(); + } + ProcessInstance workflowInstance = workflowInstanceOptional.get(); ProcessDefinition workflowDefinition = processService.findProcessDefinition( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); workflowInstance.setProcessDefinition(workflowDefinition); IWorkflowGraph workflowGraph = workflowGraphFactory.createWorkflowGraph(workflowInstance); - return new WorkflowExecuteContext( - workflowDefinition, - workflowInstance, - workflowGraph); + return Optional.of(new WorkflowExecuteContext(workflowDefinition, workflowInstance, workflowGraph)); } - private ProcessInstance createWorkflowInstance(Command command) throws CronParseException { + private Optional createWorkflowInstance(Command command) throws CronParseException { long commandTransformStartTime = System.currentTimeMillis(); // Note: this check is not safe, the slot may change after command transform. // We use the database transaction in `handleCommand` so that we can guarantee the command will @@ -76,10 +79,9 @@ private ProcessInstance createWorkflowInstance(Command command) throws CronParse throw new RuntimeException("Slot check failed the current state: " + slotCheckState); } ProcessInstance processInstance = processService.handleCommand(masterConfig.getMasterAddress(), command); - log.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); ProcessInstanceMetrics .recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime); - return processInstance; + return Optional.ofNullable(processInstance); } private SlotCheckState slotCheck(Command command) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java index da086718e264..0d04f2d92a96 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java @@ -19,7 +19,6 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; -import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; @@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.Optional; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -66,21 +67,18 @@ public class WorkflowExecuteRunnableFactory { @Autowired private MasterConfig masterConfig; - @Autowired - private TaskDefinitionLogDao taskDefinitionLogDao; - @Autowired private DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory; @Autowired private WorkflowExecuteContextFactory workflowExecuteContextFactory; - public WorkflowExecuteRunnable createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { + public Optional createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { try { - IWorkflowExecuteContext workflowExecuteRunnableContext = + Optional workflowExecuteRunnableContextOptional = workflowExecuteContextFactory.createWorkflowExecuteRunnableContext(command); - return new WorkflowExecuteRunnable( - workflowExecuteRunnableContext, + return workflowExecuteRunnableContextOptional.map(iWorkflowExecuteContext -> new WorkflowExecuteRunnable( + iWorkflowExecuteContext, commandService, processService, processInstanceDao, @@ -90,7 +88,7 @@ public WorkflowExecuteRunnable createWorkflowExecuteRunnable(Command command) th stateWheelExecuteThread, curingGlobalParamsService, taskInstanceDao, - defaultTaskExecuteRunnableFactory); + defaultTaskExecuteRunnableFactory)); } catch (Exception ex) { throw new WorkflowCreateException("Create workflow execute runnable failed", ex); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8d755fe3251c..c949da47921a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -56,11 +56,14 @@ import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; + import org.springframework.transaction.annotation.Transactional; public interface ProcessService { @Transactional + @Nullable ProcessInstance handleCommand(String host, Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index f6db4de3cc27..5728ca077964 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -303,6 +303,7 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private TriggerRelationService triggerRelationService; /** + * todo: split this method * handle Command (construct ProcessInstance from Command) , wrapped in transaction * * @param host host @@ -311,8 +312,8 @@ public class ProcessServiceImpl implements ProcessService { */ @Override @Transactional - public ProcessInstance handleCommand(String host, - Command command) throws CronParseException, CodeGenerateException { + public @Nullable ProcessInstance handleCommand(String host, + Command command) throws CronParseException, CodeGenerateException { ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { @@ -332,6 +333,7 @@ public ProcessInstance handleCommand(String host, setSubProcessParam(processInstance); triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); deleteCommandWithCheck(command.getId()); + // todo: this is a bad design to return null here, whether trigger the task return null; } } else {