From 3d02f227d1cf556b182f8ee8e502627d4d3fbaed Mon Sep 17 00:00:00 2001 From: Yohany Flores Date: Sun, 4 Jun 2023 11:08:33 -0500 Subject: [PATCH 1/5] Assign the value in seconds to postponeDurationSeconds in WAIT --- .../core/reconciliation/WorkflowSweeper.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 72f39d358f..008e0f66f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -117,12 +117,12 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { long postponeDurationSeconds = 0; for (TaskModel taskModel : workflowModel.getTasks()) { if (taskModel.getStatus() == Status.IN_PROGRESS) { - if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT) - || taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { - postponeDurationSeconds = - (taskModel.getWaitTimeout() != 0) - ? taskModel.getWaitTimeout() + 1 - : workflowOffsetTimeout; + if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT)) { + long deltaInSeconds = + (System.currentTimeMillis() - taskModel.getWaitTimeout()) / 1000; + postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds + 1 : 1; + } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { + postponeDurationSeconds = properties.getWorkflowOffsetTimeout().getSeconds(); } else { postponeDurationSeconds = (taskModel.getResponseTimeoutSeconds() != 0) From dc798ddb2077b3f7c476d91d5e976a8f208d937d Mon Sep 17 00:00:00 2001 From: Yohany Flores Date: Mon, 5 Jun 2023 13:31:49 -0500 Subject: [PATCH 2/5] Fix Fix WAIT task is not working properly (after v3.13.0) #3402 Add a test case for the issue #3402 --- .../core/reconciliation/WorkflowSweeper.java | 2 +- .../reconciliation/TestWorkflowSweeper.java | 44 +++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 008e0f66f1..f29bd8f58e 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -119,7 +119,7 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { if (taskModel.getStatus() == Status.IN_PROGRESS) { if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT)) { long deltaInSeconds = - (System.currentTimeMillis() - taskModel.getWaitTimeout()) / 1000; + (taskModel.getWaitTimeout() - System.currentTimeMillis()) / 1000; postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds + 1 : 1; } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = properties.getWorkflowOffsetTimeout().getSeconds(); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index cc1ce59146..43f2d55479 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -57,6 +57,25 @@ public void setUp() { workflowExecutor, Optional.of(workflowRepairService), properties, queueDAO); } + @Test + public void testPostponeDurationForHumanTaskType() { + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_HUMAN); + taskModel.setStatus(Status.IN_PROGRESS); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaultPostPoneOffSetSeconds * 1000); + } + @Test public void testPostponeDurationForWaitTaskType() { WorkflowModel workflowModel = new WorkflowModel(); @@ -69,11 +88,28 @@ public void testPostponeDurationForWaitTaskType() { when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO).setUnackTimeout(DECIDER_QUEUE, workflowModel.getWorkflowId(), 1000); + } + + @Test + public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { + long waitTimeout = 65845; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), - defaultPostPoneOffSetSeconds * 1000); + (waitTimeout / 1000L + 1L) * 1000L); } @Test @@ -85,14 +121,16 @@ public void testPostponeDurationForWaitTaskTypeWithWaitTime() { taskModel.setTaskId("task1"); taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); taskModel.setStatus(Status.IN_PROGRESS); - taskModel.setWaitTimeout(waitTimeout); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( - DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout + 1) * 1000); + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + (waitTimeout / 1000 + 1) * 1000); } @Test From ca6717ef93a4440f4ed4b9320ec3d4f929a4e79f Mon Sep 17 00:00:00 2001 From: Yohany Flores Date: Thu, 8 Jun 2023 20:37:42 -0500 Subject: [PATCH 3/5] Support Wait with waittimeout = 0 --- .../core/reconciliation/WorkflowSweeper.java | 12 ++++++++---- .../core/reconciliation/TestWorkflowSweeper.java | 6 +++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index f29bd8f58e..181bfc743f 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -118,11 +118,15 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { for (TaskModel taskModel : workflowModel.getTasks()) { if (taskModel.getStatus() == Status.IN_PROGRESS) { if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_WAIT)) { - long deltaInSeconds = - (taskModel.getWaitTimeout() - System.currentTimeMillis()) / 1000; - postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds + 1 : 1; + if (taskModel.getWaitTimeout() == 0) { + postponeDurationSeconds = workflowOffsetTimeout; + } else { + long deltaInSeconds = + (taskModel.getWaitTimeout() - System.currentTimeMillis()) / 1000; + postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds + 1 : 1; + } } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { - postponeDurationSeconds = properties.getWorkflowOffsetTimeout().getSeconds(); + postponeDurationSeconds = workflowOffsetTimeout; } else { postponeDurationSeconds = (taskModel.getResponseTimeoutSeconds() != 0) diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 43f2d55479..6dd753ea36 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -88,7 +88,11 @@ public void testPostponeDurationForWaitTaskType() { when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); - verify(queueDAO).setUnackTimeout(DECIDER_QUEUE, workflowModel.getWorkflowId(), 1000); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaultPostPoneOffSetSeconds * 1000); } @Test From 5d3228799f90e6b376f3d6464df638ba50d7ee0b Mon Sep 17 00:00:00 2001 From: Yohany Flores Date: Wed, 14 Jun 2023 20:11:19 -0500 Subject: [PATCH 4/5] Fix Wait one second more --- .../core/reconciliation/WorkflowSweeper.java | 2 +- .../reconciliation/TestWorkflowSweeper.java | 27 +++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 181bfc743f..46e8ca6f55 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -123,7 +123,7 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else { long deltaInSeconds = (taskModel.getWaitTimeout() - System.currentTimeMillis()) / 1000; - postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds + 1 : 1; + postponeDurationSeconds = (deltaInSeconds > 0) ? deltaInSeconds : 0; } } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 6dd753ea36..912eb8c531 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -113,11 +113,11 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), - (waitTimeout / 1000L + 1L) * 1000L); + (waitTimeout / 1000L) * 1000L); } @Test - public void testPostponeDurationForWaitTaskTypeWithWaitTime() { + public void testPostponeDurationForWaitTaskTypeWithLessOneSecondWaitTime() { long waitTimeout = 180; WorkflowModel workflowModel = new WorkflowModel(); workflowModel.setWorkflowId("1"); @@ -132,9 +132,26 @@ public void testPostponeDurationForWaitTaskTypeWithWaitTime() { workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( - DECIDER_QUEUE, - workflowModel.getWorkflowId(), - (waitTimeout / 1000 + 1) * 1000); + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); + } + + @Test + public void testPostponeDurationForWaitTaskTypeWithZeroWaitTime() { + long waitTimeout = 0; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_WAIT); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setWaitTimeout(System.currentTimeMillis() + waitTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); } @Test From bcf96415f262ec96666e762a050540cea5051f2c Mon Sep 17 00:00:00 2001 From: Yohany Flores Date: Wed, 14 Jun 2023 20:20:06 -0500 Subject: [PATCH 5/5] Refactoring Long Literal --- .../conductor/core/reconciliation/TestWorkflowSweeper.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 912eb8c531..1ed45dd7bd 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -111,9 +111,7 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( - DECIDER_QUEUE, - workflowModel.getWorkflowId(), - (waitTimeout / 1000L) * 1000L); + DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout / 1000) * 1000); } @Test