diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 72f39d358f..46e8ca6f55 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -117,12 +117,16 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { long postponeDurationSeconds = 0; for (TaskModel taskModel : workflowModel.getTasks()) { if (taskModel.getStatus() == Status.IN_PROGRESS) { - if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT) - || taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { - postponeDurationSeconds = - (taskModel.getWaitTimeout() != 0) - ? taskModel.getWaitTimeout() + 1 - : workflowOffsetTimeout; + if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT)) { + if (taskModel.getWaitTimeout() == 0) { + postponeDurationSeconds = workflowOffsetTimeout; + } else { + long deltaInSeconds = + (taskModel.getWaitTimeout() - System.currentTimeMillis()) / 1000; + postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds : 0; + } + } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { + postponeDurationSeconds = workflowOffsetTimeout; } else { postponeDurationSeconds = (taskModel.getResponseTimeoutSeconds() != 0) diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index cc1ce59146..1ed45dd7bd 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -57,6 +57,25 @@ public void setUp() { workflowExecutor, Optional.of(workflowRepairService), properties, queueDAO); } + @Test + public void testPostponeDurationForHumanTaskType() { + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_HUMAN); + taskModel.setStatus(Status.IN_PROGRESS); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaultPostPoneOffSetSeconds * 1000); + } + @Test public void testPostponeDurationForWaitTaskType() { WorkflowModel workflowModel = new WorkflowModel(); @@ -77,7 +96,26 @@ public void testPostponeDurationForWaitTaskType() { } @Test - public void testPostponeDurationForWaitTaskTypeWithWaitTime() { + public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { + long waitTimeout = 65845; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); + } + + @Test + public void testPostponeDurationForWaitTaskTypeWithLessOneSecondWaitTime() { long waitTimeout = 180; WorkflowModel workflowModel = new WorkflowModel(); workflowModel.setWorkflowId("1"); @@ -85,14 +123,33 @@ public void testPostponeDurationForWaitTaskTypeWithWaitTime() { taskModel.setTaskId("task1"); taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); taskModel.setStatus(Status.IN_PROGRESS); - taskModel.setWaitTimeout(waitTimeout); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); + } + + @Test + public void testPostponeDurationForWaitTaskTypeWithZeroWaitTime() { + long waitTimeout = 0; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( - DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout + 1) * 1000); + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); } @Test