diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java index 38eff7d841..c77f2b2939 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java @@ -101,7 +101,7 @@ *
  • UPDATE conductor.workflows SET payload=? WHERE workflow_id=? AND shard_id=1 AND * entity='workflow' AND task_id=''; *
  • UPDATE conductor.workflows SET total_tasks=? WHERE workflow_id=? AND shard_id=?; - *
  • UPDATE conductor.workflows SET * total_partitions=?,total_tasks=? WHERE workflow_id=? AND + *
  • UPDATE conductor.workflows SET total_partitions=?,total_tasks=? WHERE workflow_id=? AND * shard_id=1; *
  • UPDATE conductor.task_lookup SET workflow_id=? WHERE task_id=?; *
  • UPDATE conductor.task_def_limit SET workflow_id=? WHERE task_def_name=? AND task_id=?; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 3460a69a14..2c71bcdccc 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -66,7 +66,13 @@ public class WorkflowExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class); private static final int EXPEDITED_PRIORITY = 10; - + private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName(); + private static final Predicate UNSUCCESSFUL_TERMINAL_TASK = + task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal(); + private static final Predicate UNSUCCESSFUL_JOIN_TASK = + UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType())); + private static final Predicate NON_TERMINAL_TASK = + task -> !task.getStatus().isTerminal(); private final MetadataDAO metadataDAO; private final QueueDAO queueDAO; private final DeciderService deciderService; @@ -78,20 +84,9 @@ public class WorkflowExecutor { private final WorkflowStatusListener workflowStatusListener; private final SystemTaskRegistry systemTaskRegistry; private final ApplicationEventPublisher eventPublisher; - private long activeWorkerLastPollMs; - private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName(); private final ExecutionLockService executionLockService; - private static final Predicate UNSUCCESSFUL_TERMINAL_TASK = - task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal(); - - private static final Predicate UNSUCCESSFUL_JOIN_TASK = - UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType())); - - private static final Predicate NON_TERMINAL_TASK = - task -> !task.getStatus().isTerminal(); - private final Predicate validateLastPolledTime = pollData -> pollData.getLastPollTime() @@ -352,6 +347,7 @@ private void retry(WorkflowModel workflow) { if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString()) || task.getTaskType().equalsIgnoreCase(TaskType.DO_WHILE.toString())) { task.setStatus(IN_PROGRESS); + addTaskToQueue(task); // Task doesn't have to be updated yet. Will be updated along with other // Workflow tasks downstream. } else { @@ -870,10 +866,7 @@ public void updateTask(TaskResult taskResult) { task.getTaskDefName(), lastDuration, false, task.getStatus()); } - // sync evaluate workflow only if the task is not within a forked branch - if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { - expediteLazyWorkflowEvaluation(workflowId); - } else { + if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { decide(workflowId); } } @@ -1094,7 +1087,11 @@ private void adjustStateIfSubWorkflowChanged(WorkflowModel workflow) { // and the JOIN task(s) needs to be evaluated again, set them to IN_PROGRESS workflow.getTasks().stream() .filter(UNSUCCESSFUL_JOIN_TASK) - .peek(t -> t.setStatus(TaskModel.Status.IN_PROGRESS)) + .peek( + task -> { + task.setStatus(TaskModel.Status.IN_PROGRESS); + addTaskToQueue(task); + }) .forEach(executionDAOFacade::updateTask); } } @@ -1305,6 +1302,7 @@ public void skipTaskFromWorkflow( taskToBeSkipped.setWorkflowInstanceId(workflowId); taskToBeSkipped.setWorkflowPriority(workflow.getPriority()); taskToBeSkipped.setStatus(SKIPPED); + taskToBeSkipped.setEndTime(System.currentTimeMillis()); taskToBeSkipped.setTaskType(workflowTask.getName()); taskToBeSkipped.setCorrelationId(workflow.getCorrelationId()); if (skipTaskRequest != null) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 0159c82646..6a654fbb92 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -97,4 +97,8 @@ public boolean execute( } return false; } + + public boolean isAsync() { + return true; + } } diff --git a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java index faf191d916..c90259b0ae 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java @@ -190,10 +190,8 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { break; } } - } catch (TransientException te) { + } catch (TransientException | TerminateWorkflowException te) { throw te; - } catch (TerminateWorkflowException twe) { - throw twe; } catch (Exception e) { LOGGER.error( "Unable to upload payload to external storage for workflow: {}", workflowId, e); @@ -223,7 +221,6 @@ void failTask(TaskModel task, PayloadType payloadType, String errorMsg) { } else { task.setOutputData(new HashMap<>()); } - throw new TerminateWorkflowException(errorMsg, WorkflowModel.Status.FAILED, task); } @VisibleForTesting diff --git a/core/src/test/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtilsTest.java b/core/src/test/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtilsTest.java index 1ba3a8a0c6..f9f7c81059 100644 --- a/core/src/test/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtilsTest.java +++ b/core/src/test/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtilsTest.java @@ -41,6 +41,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import static com.netflix.conductor.model.TaskModel.Status.FAILED_WITH_TERMINAL_ERROR; + import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @@ -199,11 +201,11 @@ public void testFailTaskWithInputPayload() { TaskModel task = new TaskModel(); task.setInputData(new HashMap<>()); - expectedException.expect(TerminateWorkflowException.class); externalPayloadStorageUtils.failTask( task, ExternalPayloadStorage.PayloadType.TASK_INPUT, "error"); assertNotNull(task); assertTrue(task.getInputData().isEmpty()); + assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus()); } @Test @@ -211,11 +213,11 @@ public void testFailTaskWithOutputPayload() { TaskModel task = new TaskModel(); task.setOutputData(new HashMap<>()); - expectedException.expect(TerminateWorkflowException.class); externalPayloadStorageUtils.failTask( task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, "error"); assertNotNull(task); assertTrue(task.getOutputData().isEmpty()); + assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus()); } @Test diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy index 24075d3000..2d46ae3609 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy @@ -12,8 +12,12 @@ */ package com.netflix.conductor.test.integration +import org.springframework.beans.factory.annotation.Autowired + import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join +import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -23,6 +27,9 @@ import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAc class DecisionTaskSpec extends AbstractSpecification { + @Autowired + Join joinTask + @Shared def DECISION_WF = "DecisionWorkflow" @@ -139,6 +146,7 @@ class DecisionTaskSpec extends AbstractSpecification { } when: "the tasks 'integration_task_1' and 'integration_task_10' are polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("joinTask").taskId def polledAndCompletedTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') def polledAndCompletedTask10Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_10', 'task1.integration.worker') @@ -189,7 +197,10 @@ class DecisionTaskSpec extends AbstractSpecification { then: "verify that the task is completed and acknowledged" verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1) - and: "verify that the 'integration_task_2' is COMPLETED and the workflow has progressed" + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that JOIN is COMPLETED and the workflow has progressed" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 7 diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy index cb5e86dbe1..b94786b8db 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy @@ -18,6 +18,7 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.common.utils.TaskUtils +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.test.base.AbstractSpecification @@ -25,19 +26,22 @@ import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAc class DoWhileSpec extends AbstractSpecification { + @Autowired + Join joinTask + @Autowired SubWorkflow subWorkflowTask def setup() { - workflowTestUtil.registerWorkflows("do_while_integration_test.json", - "do_while_multiple_integration_test.json", - "do_while_as_subtask_integration_test.json", + workflowTestUtil.registerWorkflows('do_while_integration_test.json', + 'do_while_multiple_integration_test.json', + 'do_while_as_subtask_integration_test.json', 'simple_one_task_sub_workflow_integration_test.json', 'do_while_iteration_fix_test.json', - "do_while_sub_workflow_integration_test.json", - "do_while_five_loop_over_integration_test.json", - "do_while_system_tasks.json", - "do_while_set_variable_fix.json") + 'do_while_sub_workflow_integration_test.json', + 'do_while_five_loop_over_integration_test.json', + 'do_while_system_tasks.json', + 'do_while_set_variable_fix.json') } def "Test workflow with 2 iterations of five tasks"() { @@ -275,6 +279,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second task" + def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -300,6 +305,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1) @@ -363,6 +371,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second task" + def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -388,6 +397,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1) @@ -528,6 +540,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second task" + def join1Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -553,6 +566,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, join1Id) + then: "Verify that the task was polled and acknowledged and workflow is in running state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1) @@ -609,6 +625,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second iteration of second task" + def join2Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__2").taskId Tuple polledAndCompletedSecondIterationTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -644,6 +661,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing second iteration of third task" Tuple polledAndCompletedSecondIterationTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, join2Id) + then: "Verify that the task was polled and acknowledged and workflow is in running state" verifyPolledAndAcknowledgedTask(polledAndCompletedSecondIterationTask2) verifyTaskIteration(polledAndCompletedSecondIterationTask2[0] as Task, 2) @@ -796,6 +816,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second task" + def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -823,6 +844,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1) @@ -920,6 +944,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing second task" + def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -947,6 +972,9 @@ class DoWhileSpec extends AbstractSpecification { when: "Polling and completing third task" Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker') + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1) @@ -998,6 +1026,7 @@ class DoWhileSpec extends AbstractSpecification { } when: "Polling and completing first task in DO While" + def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join").taskId Tuple polledAndCompletedTask0 = workflowTestUtil.pollAndCompleteTask('integration_task_0', 'integration.test.worker') then: "Verify that the task was polled and acknowledged and workflow is in running state" @@ -1049,6 +1078,9 @@ class DoWhileSpec extends AbstractSpecification { and: "the workflow is evaluated" sweep(workflowInstanceId) + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinId) + then: "Verify that the task was polled and acknowledged and workflow is in completed state" verifyPolledAndAcknowledgedTask(polledAndCompletedTask2) with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index c23a73d39e..d7dac29933 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -22,6 +22,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.common.metadata.workflow.WorkflowTask import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.StartWorkflowInput +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -33,6 +34,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { @Autowired QueueDAO queueDAO + @Autowired + Join joinTask + @Autowired SubWorkflow subWorkflowTask @@ -94,6 +98,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { } when: "Poll and complete 'integration_task_2' and 'integration_task_3'" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker', ['ok1': 'ov1']) def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker', @@ -106,7 +111,10 @@ class DynamicForkJoinSpec extends AbstractSpecification { workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2']) - and: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 6 @@ -281,6 +289,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { } when: "Poll and fail 'integration_task_2'" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId def pollAndCompleteTask2Try1 = workflowTestUtil.pollAndFailTask('integration_task_2', 'task2.worker', 'it is a failure..') and: "workflow is evaluated by the reconciler" @@ -337,7 +346,10 @@ class DynamicForkJoinSpec extends AbstractSpecification { workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try2, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try1, ['k2': 'v2']) - and: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 8 @@ -433,6 +445,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { } when: "the subworkflow is started by issuing a system task call" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) String subworkflowTaskId = polledTaskIds.get(0) asyncSystemTaskExecutor.execute(subWorkflowTask, subworkflowTaskId) @@ -638,6 +651,9 @@ class DynamicForkJoinSpec extends AbstractSpecification { when: "the workflow is evaluated" sweep(workflowInstanceId) + and: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + then: "the workflow has progressed beyond the join task" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING @@ -738,6 +754,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { } when: "Poll and complete 'integration_task_2' and 'integration_task_3'" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("dynamicfanouttask_join").taskId def pollAndCompleteTask2Try = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.worker') def pollAndCompleteTask3Try = workflowTestUtil.pollAndCompleteTask('integration_task_3', 'task3.worker') @@ -748,7 +765,10 @@ class DynamicForkJoinSpec extends AbstractSpecification { workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask2Try, ['k1': 'v1']) workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask3Try, ['k2': 'v2']) - and: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that the workflow has progressed and the 'integration_task_2' and 'integration_task_3' are complete" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 6 @@ -849,7 +869,7 @@ class DynamicForkJoinSpec extends AbstractSpecification { workflowDef.ownerEmail = 'test@harness.com' def startWorkflowInput = new StartWorkflowInput(name: workflowDef.name, version: workflowDef.version, workflowInput: workflowInput, workflowDefinition: workflowDef) - def workflowInstanceId = workflowExecutor.startWorkflow(startWorkflowInput) + def workflowInstanceId = startWorkflowOperation.execute(startWorkflowInput) then: "verify that workflow failed" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy index 599adad48a..ea13939750 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ExternalPayloadStorageSpec.groovy @@ -18,6 +18,7 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.test.base.AbstractSpecification import com.netflix.conductor.test.utils.MockExternalPayloadStorage @@ -58,6 +59,9 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { @Autowired SubWorkflow subWorkflowTask + @Autowired + Join joinTask + @Autowired MockExternalPayloadStorage mockExternalPayloadStorage @@ -296,6 +300,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { } when: "the first task of the left fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").taskId def polledAndAckTask = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') then: "verify that the 'integration_task_1' was polled and acknowledged" @@ -349,7 +354,10 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { then: "verify that the 'integration_task_3' was polled and acknowledged" verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask) - and: "task is completed and the next task after join in scheduled" + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "task is completed and the next task after join in scheduled" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 6 @@ -880,6 +888,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { when: "the first task of the left fork is polled and completed" def polledAndAckTask = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that the 'integration_task_1' was polled and acknowledged" verifyPolledAndAcknowledgedTask(polledAndAckTask) @@ -932,8 +941,10 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { then: "verify that the 'integration_task_3' was polled and acknowledged" verifyPolledAndAcknowledgedLargePayloadTask(polledAndAckLargePayloadTask) - and: "task is completed and the join task is failed because of exceeding size limit" - Thread.sleep(5000L) + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "task is completed and the join task is failed because of exceeding size limit" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.FAILED tasks.size() == 5 @@ -947,6 +958,7 @@ class ExternalPayloadStorageSpec extends AbstractSpecification { tasks[3].status == Task.Status.FAILED_WITH_TERMINAL_ERROR tasks[3].taskType == 'JOIN' tasks[3].outputData.isEmpty() + !tasks[3].getExternalOutputPayloadStoragePath() } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index a2eb4126dd..437f4e1ccc 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -18,13 +18,18 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow +import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared class ForkJoinSpec extends AbstractSpecification { + @Autowired + Join joinTask + @Shared def FORK_JOIN_WF = 'FanInOutTest' @@ -93,6 +98,7 @@ class ForkJoinSpec extends AbstractSpecification { } when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() def polledAndAckTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.worker') then: "verify that the 'integration_task_1' was polled and acknowledged" @@ -139,7 +145,10 @@ class ForkJoinSpec extends AbstractSpecification { then: "verify that the 'integration_task_2' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) - and: "The workflow has been updated with the task status and task list" + when: "JOIN task executed by the async executor" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "The workflow has been updated with the task status and task list" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 6 @@ -265,6 +274,7 @@ class ForkJoinSpec extends AbstractSpecification { } when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() def polledAndAckTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_0_RT_1', 'task1.worker') then: "verify that the 'integration_task_0_RT_1' was polled and acknowledged" @@ -344,13 +354,16 @@ class ForkJoinSpec extends AbstractSpecification { then: "verify that the 'integration_task_2' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2) - when: "The last task of the workflow is then polled and completed integration_task_0_RT_4'" + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "The last task of the workflow is then polled and completed integration_task_0_RT_4'" def polledAndAckTask4Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_0_RT_4', 'task1.worker') then: "verify that the 'integration_task_0_RT_4' was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask4Try1) - and: "Then verify that the workflow is completed and the task list of execution is as expected" + then: "Then verify that the workflow is completed and the task list of execution is as expected" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 8 @@ -407,6 +420,8 @@ class ForkJoinSpec extends AbstractSpecification { } when: "Poll and Complete tasks: 'integration_task_11', 'integration_task_12' and 'integration_task_13'" + def outerJoinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join1").getTaskId() + def innerJoinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join2").getTaskId() def polledAndAckTask11Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_11', 'task11.worker') def polledAndAckTask12Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_12', 'task12.worker') def polledAndAckTask13Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_13', 'task13.worker') @@ -511,10 +526,14 @@ class ForkJoinSpec extends AbstractSpecification { and: "workflow is evaluated" sweep(workflowInstanceId) - then: "verify that tasks 'integration_task_20'polled and acknowledged" + then: "verify that task 'integration_task_20' is polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1) - and: "verify the state of the workflow" + when: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + + then: "verify the state of the workflow" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 13 @@ -585,6 +604,8 @@ class ForkJoinSpec extends AbstractSpecification { } when: "Poll and Complete tasks: 'integration_task_11', 'integration_task_12' and 'integration_task_13'" + def outerJoinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join1").getTaskId() + def innerJoinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join2").getTaskId() def polledAndAckTask11Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_11', 'task11.worker') def polledAndAckTask12Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_12', 'task12.worker') def polledAndAckTask13Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_13', 'task13.worker') @@ -735,7 +756,11 @@ class ForkJoinSpec extends AbstractSpecification { then: "verify that the task was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask20Try1) - and: "verify that the integration_task_20 is completed and the next task is scheduled" + when: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + + then: "verify that the integration_task_20 is completed and the next task is scheduled" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 14 @@ -798,6 +823,7 @@ class ForkJoinSpec extends AbstractSpecification { asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId1) def subWorkflowTaskId2 = workflowWithScheduledSubWorkflows.getTaskByRefName('st2').taskId asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId2) + def joinTaskId = workflowWithScheduledSubWorkflows.getTaskByRefName("fanouttask_join").taskId then: "verify that the sub workflow tasks are in a IN PROGRESS state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { @@ -856,7 +882,10 @@ class ForkJoinSpec extends AbstractSpecification { } sweep(workflowInstanceId) - and: "verify that the workflow is in a COMPLETED state and the join task is also marked as COMPLETED_WITH_ERRORS" + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that the workflow is in a COMPLETED state and the join task is also marked as COMPLETED_WITH_ERRORS" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 4 @@ -925,6 +954,7 @@ class ForkJoinSpec extends AbstractSpecification { when: "the subworkflow is started by issuing a system task call" def parentWorkflow = workflowExecutionService.getExecutionStatus(workflowInstanceId, true) def subWorkflowTaskId = parentWorkflow.getTaskByRefName('st1').taskId + def jointaskId = parentWorkflow.getTaskByRefName("fanouttask_join").taskId asyncSystemTaskExecutor.execute(subWorkflowTask, subWorkflowTaskId) then: "verify that the sub workflow task is in a IN_PROGRESS state" @@ -1061,7 +1091,10 @@ class ForkJoinSpec extends AbstractSpecification { then: "verify that the task was polled and acknowledged" workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndCompletedSimpleTask) - and: "verify that the workflow is in a COMPLETED state" + when: "JOIN task is executed" + asyncSystemTaskExecutor.execute(joinTask, jointaskId) + + then: "verify that the workflow is in a COMPLETED state" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 5 diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 0875746db3..5e6d8c0be3 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -18,6 +18,7 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -43,6 +44,9 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { @Autowired SubWorkflow subWorkflowTask + @Autowired + Join joinTask + String rootWorkflowId, midLevelWorkflowId, leafWorkflowId TaskDef persistedTask2Definition @@ -225,6 +229,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the root workflow" + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" @@ -248,6 +253,7 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the mid-level workflow" + def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "poll and execute the sub workflow task" @@ -282,6 +288,10 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { sweep(newMidLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(newMidLevelWorkflowId) @@ -336,6 +346,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the mid level workflow" + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" @@ -370,6 +382,10 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) @@ -433,6 +449,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { when: "the mid level and root workflows are sweeped" sweep(midLevelWorkflowId) sweep(rootWorkflowId) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that the mid level workflow's JOIN is updated" with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { @@ -482,6 +500,10 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy index 268421af42..61fdd055ed 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy @@ -17,6 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -42,6 +43,9 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { @Autowired SubWorkflow subWorkflowTask + @Autowired + Join joinTask + String rootWorkflowId, midLevelWorkflowId, leafWorkflowId TaskDef persistedTask2Definition @@ -222,6 +226,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the root workflow" + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" @@ -245,6 +250,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the mid-level workflow" + def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "poll and execute the sub workflow task" @@ -279,6 +285,10 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { sweep(newMidLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(newMidLevelWorkflowId) @@ -298,7 +308,7 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { */ def "Test restart on the mid-level in a 3-level subworkflow"() { //region Test case - when: "do a retry on the mid level workflow" + when: "do a restart on the mid level workflow" workflowExecutor.restart(midLevelWorkflowId, false) then: "verify that the mid workflow created a new execution" @@ -331,6 +341,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { } when: "poll and complete the integration_task_2 task in the mid level workflow" + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) and: "the SUB_WORKFLOW task in mid level workflow is started by issuing a system task call" @@ -365,6 +377,10 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) @@ -460,6 +476,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { when: "poll and complete both tasks in the leaf workflow" workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that the leaf workflow reached COMPLETED state" with(workflowExecutionService.getExecutionStatus(leafWorkflowId, true)) { @@ -475,6 +493,10 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy index 5eb335e8ef..5bd1552499 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRetrySpec.groovy @@ -17,6 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -42,6 +43,9 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { @Autowired SubWorkflow subWorkflowTask + @Autowired + Join joinTask + String rootWorkflowId, midLevelWorkflowId, leafWorkflowId TaskDef persistedTask2Definition @@ -229,6 +233,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { def polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds[0]) def newMidLevelWorkflowId = workflowExecutionService.getTask(polledTaskIds[0]).subWorkflowId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -247,6 +252,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "poll and complete the integration_task_1 task in the mid-level workflow" workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId and: "poll and execute the sub workflow task" polledTaskIds = queueDAO.pop(TASK_TYPE_SUB_WORKFLOW, 1, 200) @@ -280,6 +286,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(newMidLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(newMidLevelWorkflowId) @@ -350,6 +360,8 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { } when: "poll and complete the 2 tasks in the leaf workflow" + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done']) @@ -367,6 +379,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertSubWorkflowTaskIsRetriedAndWorkflowCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) @@ -434,6 +450,8 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the mid level and root workflows are 'decided'" sweep(midLevelWorkflowId) sweep(rootWorkflowId) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify that the mid-level workflow's JOIN task is updated" with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { @@ -486,6 +504,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "verify that the mid level and root workflows reach COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) assertWorkflowIsCompleted(rootWorkflowId) @@ -496,7 +518,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { * On a 3-level workflow where all workflows reach FAILED state because of a FAILED task * in the leaf workflow. * - * A retry is executed with resume flag on the mid-level workflow. + * A retry is executed with resume flag on the root workflow. * * Expectation: The leaf workflow is retried and both its parent (mid-level) and grand parent (root) workflows are also retried. * When the leaf workflow completes successfully, both the mid-level and root workflows also complete successfully. @@ -551,6 +573,8 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the mid level and root workflows are 'decided'" sweep(midLevelWorkflowId) sweep(rootWorkflowId) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify the mid level workflow's JOIN is updated" with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { @@ -600,6 +624,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) @@ -612,7 +640,7 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { * On a 3-level workflow where all workflows reach FAILED state because of a FAILED task * in the leaf workflow. * - * A retry is executed on the leaf workflow. + * A retry is executed on the mid-level workflow. * * Expectation: The leaf workflow resumes its FAILED task and updates both its parent (mid-level) and grandparent (root). * When the leaf workflow completes successfully, both the mid-level and root workflows also complete successfully. @@ -667,6 +695,8 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the mid level and root workflows are 'decided'" sweep(midLevelWorkflowId) sweep(rootWorkflowId) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify the mid level workflow's JOIN is updated" with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { @@ -716,6 +746,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) @@ -783,6 +817,8 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { when: "the mid level and root workflows are 'decided'" sweep(midLevelWorkflowId) sweep(rootWorkflowId) + def midJoinId = workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId + def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId then: "verify the mid level workflow's JOIN is updated" with(workflowExecutionService.getExecutionStatus(midLevelWorkflowId, true)) { @@ -834,6 +870,10 @@ class HierarchicalForkJoinSubworkflowRetrySpec extends AbstractSpecification { sweep(midLevelWorkflowId) sweep(rootWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, midJoinId) + asyncSystemTaskExecutor.execute(joinTask, rootJoinId) + then: "the new mid level workflow is in COMPLETED state" assertWorkflowIsCompleted(midLevelWorkflowId) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy index 846d518a07..61eca08ab5 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/NestedForkJoinSubWorkflowSpec.groovy @@ -17,6 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -38,6 +39,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { @Autowired QueueDAO queueDAO + @Autowired + Join joinTask + @Autowired SubWorkflow subWorkflowTask @@ -97,7 +101,7 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', ['op': 'task1.done']) - when: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" + and: "the subworkflow task should be in SCHEDULED state and is started by issuing a system task call" List polledTaskIds = queueDAO.pop("SUB_WORKFLOW", 1, 200) asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) @@ -216,6 +220,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { } when: "the parent workflow is swept" + def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) + def outerJoinId = workflow.getTaskByRefName("outer_join").taskId + def innerJoinId = workflow.getTaskByRefName("inner_join").taskId sweep(parentWorkflowId) then: "verify that the flag is reset and JOIN is updated" @@ -277,6 +284,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the parent workflow is swept" sweep(parentWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + then: "verify that the parent workflow reaches COMPLETED with all tasks completed" assertParentWorkflowIsComplete() } @@ -319,6 +330,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { asyncSystemTaskExecutor.execute(subWorkflowTask, polledTaskIds.get(0)) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) + def outerJoinId = workflow.getTaskByRefName("outer_join").taskId + def innerJoinId = workflow.getTaskByRefName("inner_join").taskId then: "verify that SUB_WORKFLOW task in in progress" def parentWorkflowInstance = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) @@ -389,6 +403,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the parent workflow is swept" sweep(parentWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + then: "verify that the parent workflow reaches COMPLETED with all tasks completed" assertParentWorkflowIsComplete() } @@ -470,6 +488,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { } when: "poll and complete both tasks in the sub workflow" + def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) + def outerJoinId = workflow.getTaskByRefName("outer_join").taskId + def innerJoinId = workflow.getTaskByRefName("inner_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', ['op': 'task1.done']) workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) @@ -510,6 +531,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the parent workflow is swept" sweep(parentWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + then: "verify that the parent workflow reaches COMPLETED with all tasks completed" with(workflowExecutionService.getExecutionStatus(parentWorkflowId, true)) { status == Workflow.WorkflowStatus.COMPLETED @@ -608,6 +633,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { } when: "poll and complete the failed task in the sub workflow" + def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) + def outerJoinId = workflow.getTaskByRefName("outer_join").taskId + def innerJoinId = workflow.getTaskByRefName("inner_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) then: "verify that the subworkflow completed" @@ -647,6 +675,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the parent workflow is swept" sweep(parentWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + then: "verify the parent workflow reaches COMPLETED state" assertParentWorkflowIsComplete() } @@ -724,6 +756,9 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { } when: "poll and complete the failed task in the sub workflow" + def workflow = workflowExecutionService.getExecutionStatus(parentWorkflowId, true) + def outerJoinId = workflow.getTaskByRefName("outer_join").taskId + def innerJoinId = workflow.getTaskByRefName("inner_join").taskId workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) then: "verify that the subworkflow completed" @@ -763,6 +798,10 @@ class NestedForkJoinSubWorkflowSpec extends AbstractSpecification { when: "the parent workflow is swept" sweep(parentWorkflowId) + and: "JOIN tasks are executed" + asyncSystemTaskExecutor.execute(joinTask, innerJoinId) + asyncSystemTaskExecutor.execute(joinTask, outerJoinId) + then: "verify the parent workflow reaches COMPLETED state" assertParentWorkflowIsComplete() } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy index 349ecdfb9a..cffc9b6156 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SwitchTaskSpec.groovy @@ -12,8 +12,12 @@ */ package com.netflix.conductor.test.integration +import org.springframework.beans.factory.annotation.Autowired + import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.tasks.Join +import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -23,6 +27,9 @@ import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAc class SwitchTaskSpec extends AbstractSpecification { + @Autowired + Join joinTask + @Shared def SWITCH_WF = "SwitchWorkflow" @@ -143,6 +150,7 @@ class SwitchTaskSpec extends AbstractSpecification { } when: "the tasks 'integration_task_1' and 'integration_task_10' are polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("joinTask").taskId def polledAndCompletedTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') def polledAndCompletedTask10Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_10', 'task1.integration.worker') @@ -193,7 +201,10 @@ class SwitchTaskSpec extends AbstractSpecification { then: "verify that the task is completed and acknowledged" verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1) - and: "verify that the 'integration_task_2' is COMPLETED and the workflow has progressed" + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "verify that the JOIN is COMPLETED and the workflow has progressed" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 7