Skip to content

Commit

Permalink
[Improvement-16612][Master] For logical tasks on the Master, there sh…
Browse files Browse the repository at this point in the history
…ould be support for dry run (#16616)
  • Loading branch information
shouwangyw authored Oct 12, 2024
1 parent ea268d6 commit ccedd2e
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;

import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;

import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand Down Expand Up @@ -103,6 +104,15 @@ public void run() {
TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
beforeExecute();

if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
log.info(
"The current execute mode is dry run, will stop the logic task and set the taskInstance status to success");
return;
}
TaskInstanceLogHeader.printExecuteTaskHeader();
executeTask();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.integration;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
.dryRun(workflowTriggerDTO.dryRun)
.build();

final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
Expand Down Expand Up @@ -139,6 +141,9 @@ public static class WorkflowTriggerDTO {
private final WorkflowDefinition workflowDefinition;

private final RunWorkflowCommandParam runWorkflowCommandParam;

@Builder.Default
private Flag dryRun = Flag.NO;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,50 @@ public void testStartWorkflow_with_oneSuccessTask() {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});
});

assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) dry run success")
public void testStartWorkflow_with_oneSuccessTaskDryRun() {
final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getWorkflows().get(0);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});

Expand Down Expand Up @@ -121,7 +159,9 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO);
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());

final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
Expand All @@ -131,6 +171,7 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
.satisfiesExactly(workflowInstance -> {
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});

Assertions
Expand All @@ -151,6 +192,51 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) dry run, will not execute")
public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getWorkflows().get(0);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {

Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());

final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
Assertions
.assertThat(subWorkflowInstance)
.isEmpty();

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});

assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {
Expand Down

0 comments on commit ccedd2e

Please sign in to comment.