From ccedd2ed95868b5bbb16a7451f112c6ca9669f45 Mon Sep 17 00:00:00 2001 From: "veli.yang" <897900564@qq.com> Date: Sat, 12 Oct 2024 14:25:56 +0800 Subject: [PATCH] [Improvement-16612][Master] For logical tasks on the Master, there should be support for dry run (#16616) --- .../runner/execute/MasterTaskExecutor.java | 10 +++ .../master/integration/WorkflowOperator.java | 5 ++ .../cases/WorkflowStartTestCase.java | 90 ++++++++++++++++++- 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java index aba154fffa6b..d21e81fe97df 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; +import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -103,6 +104,15 @@ public void run() { TaskInstanceLogHeader.printLoadTaskInstancePluginHeader(); beforeExecute(); + if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); + taskExecutionContext.setEndTime(System.currentTimeMillis()); + MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId()); + logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext); + log.info( + "The current execute mode is dry run, will stop the logic task and set the taskInstance status to success"); + return; + } TaskInstanceLogHeader.printExecuteTaskHeader(); executeTask(); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java index 651026e3acec..3c30fe947c3a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.integration; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; @@ -61,6 +62,7 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO .workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion()) .startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes()) .startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams()) + .dryRun(workflowTriggerDTO.dryRun) .build(); final WorkflowManualTriggerResponse manualTriggerWorkflowResponse = @@ -139,6 +141,9 @@ public static class WorkflowTriggerDTO { private final WorkflowDefinition workflowDefinition; private final RunWorkflowCommandParam runWorkflowCommandParam; + + @Builder.Default + private Flag dryRun = Flag.NO; } @Data diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index e441372086d4..f91594d7d63d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -87,12 +87,50 @@ public void testStartWorkflow_with_oneSuccessTask() { Assertions .assertThat(repository.queryWorkflowInstance(workflowInstanceId)) .matches( - workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS); + workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS) + .matches( + workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode()); + Assertions + .assertThat(repository.queryTaskInstance(workflow)) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("A"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS); + assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode()); + }); + }); + + assertThat(workflowRepository.getAll()).isEmpty(); + } + + @Test + @DisplayName("Test start a workflow with one fake task(A) dry run success") + public void testStartWorkflow_with_oneSuccessTaskDryRun() { + final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getWorkflows().get(0); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .dryRun(Flag.YES) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .matches( + workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS) + .matches( + workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode()); Assertions .assertThat(repository.queryTaskInstance(workflow)) .satisfiesExactly(taskInstance -> { assertThat(taskInstance.getName()).isEqualTo("A"); assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS); + assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode()); }); }); @@ -121,7 +159,9 @@ public void testStartWorkflow_with_subWorkflowTask_success() { .matches( workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS) .matches( - workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO); + workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO) + .matches( + workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode()); final List subWorkflowInstance = repository.queryWorkflowInstance(context.getWorkflows().get(1)); @@ -131,6 +171,7 @@ public void testStartWorkflow_with_subWorkflowTask_success() { .satisfiesExactly(workflowInstance -> { assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS); assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES); + assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode()); }); Assertions @@ -151,6 +192,51 @@ public void testStartWorkflow_with_subWorkflowTask_success() { assertThat(workflowRepository.getAll()).isEmpty(); } + @Test + @DisplayName("Test start a workflow with one sub workflow task(A) dry run, will not execute") + public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() { + final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition parentWorkflow = context.getWorkflows().get(0); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(parentWorkflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .dryRun(Flag.YES) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .matches( + workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS) + .matches( + workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO) + .matches( + workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode()); + + final List subWorkflowInstance = + repository.queryWorkflowInstance(context.getWorkflows().get(1)); + Assertions + .assertThat(subWorkflowInstance) + .isEmpty(); + + Assertions + .assertThat(repository.queryTaskInstance(workflowInstanceId)) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("sub_logic_task"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS); + assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode()); + }); + }); + + assertThat(workflowRepository.getAll()).isEmpty(); + } + @Test @DisplayName("Test start a workflow with one sub workflow task(A) failed") public void testStartWorkflow_with_subWorkflowTask_failed() {