Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3623 from charybr/feature/task_status_listener
Browse files Browse the repository at this point in the history
Feature: Need for Task status listener
  • Loading branch information
v1r3n authored Jun 17, 2023
2 parents 878fcd9 + 62ee65a commit e68dee6
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ Changes to configurations:
| workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true |
| workflow.events.default.queue.type | conductor.default-event-queue.type | sqs |
| workflow.status.listener.type | conductor.workflow-status-listener.type | stub |
| - | conductor.task-status-listener.type | stub |
| workflow.decider.locking.server | conductor.workflow-execution-lock.type | noop_lock |
| | | |
| workflow.default.event.queue.enabled | conductor.event-queues.default.enabled | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.TaskStatusListenerStub;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListenerStub;
import com.netflix.conductor.core.storage.DummyPayloadStorage;
Expand Down Expand Up @@ -80,6 +82,15 @@ public WorkflowStatusListener workflowStatusListener() {
return new WorkflowStatusListenerStub();
}

@ConditionalOnProperty(
name = "conductor.task-status-listener.type",
havingValue = "stub",
matchIfMissing = true)
@Bean
public TaskStatusListener taskStatusListener() {
return new TaskStatusListenerStub();
}

@Bean
public ExecutorService executorService(ConductorProperties conductorProperties) {
ThreadFactory threadFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.utils.IDGenerator;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class WorkflowExecutor {
private final ParametersUtils parametersUtils;
private final IDGenerator idGenerator;
private final WorkflowStatusListener workflowStatusListener;
private final TaskStatusListener taskStatusListener;
private final SystemTaskRegistry systemTaskRegistry;
private final ApplicationEventPublisher eventPublisher;
private long activeWorkerLastPollMs;
Expand All @@ -97,6 +99,7 @@ public WorkflowExecutor(
QueueDAO queueDAO,
MetadataMapperService metadataMapperService,
WorkflowStatusListener workflowStatusListener,
TaskStatusListener taskStatusListener,
ExecutionDAOFacade executionDAOFacade,
ConductorProperties properties,
ExecutionLockService executionLockService,
Expand All @@ -112,6 +115,7 @@ public WorkflowExecutor(
this.executionDAOFacade = executionDAOFacade;
this.activeWorkerLastPollMs = properties.getActiveWorkerLastPollTimeout().toMillis();
this.workflowStatusListener = workflowStatusListener;
this.taskStatusListener = taskStatusListener;
this.executionLockService = executionLockService;
this.parametersUtils = parametersUtils;
this.idGenerator = idGenerator;
Expand Down Expand Up @@ -846,6 +850,16 @@ public void updateTask(TaskResult taskResult) {
throw new TransientException(errorMsg, e);
}

try {
notifyTaskStatusListener(task);
} catch (Exception e) {
String errorMsg =
String.format(
"Error while notifying TaskStatusListener: %s for workflow: %s",
task.getTaskId(), workflowId);
LOGGER.error(errorMsg, e);
}

taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId()));
executionDAOFacade.addTaskExecLog(taskResult.getLogs());

Expand All @@ -863,6 +877,33 @@ public void updateTask(TaskResult taskResult) {
}
}

private void notifyTaskStatusListener(TaskModel task) {
switch (task.getStatus()) {
case COMPLETED:
taskStatusListener.onTaskCompleted(task);
break;
case CANCELED:
taskStatusListener.onTaskCanceled(task);
break;
case FAILED:
taskStatusListener.onTaskFailed(task);
break;
case FAILED_WITH_TERMINAL_ERROR:
taskStatusListener.onTaskFailedWithTerminalError(task);
break;
case TIMED_OUT:
taskStatusListener.onTaskTimedOut(task);
break;
case IN_PROGRESS:
taskStatusListener.onTaskInProgress(task);
break;
case SCHEDULED:
// no-op, already done in addTaskToQueue
default:
break;
}
}

private void extendLease(TaskResult taskResult) {
TaskModel task =
Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId()))
Expand Down Expand Up @@ -1500,6 +1541,16 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
private void addTaskToQueue(final List<TaskModel> tasks) {
for (TaskModel task : tasks) {
addTaskToQueue(task);
// notify TaskStatusListener
try {
taskStatusListener.onTaskScheduled(task);
} catch (Exception e) {
String errorMsg =
String.format(
"Error while notifying TaskStatusListener: %s for workflow: %s",
task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.listener;

import com.netflix.conductor.model.TaskModel;

/**
* Listener for the Task status change. All methods have default implementation so that
* Implementation can choose to override a subset of interested Task statuses.
*/
public interface TaskStatusListener {

default void onTaskScheduled(TaskModel task) {}

default void onTaskInProgress(TaskModel task) {}

default void onTaskCanceled(TaskModel task) {}

default void onTaskFailed(TaskModel task) {}

default void onTaskFailedWithTerminalError(TaskModel task) {}

default void onTaskCompleted(TaskModel task) {}

default void onTaskCompletedWithErrors(TaskModel task) {}

default void onTaskTimedOut(TaskModel task) {}

default void onTaskSkipped(TaskModel task) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.model.TaskModel;

/** Stub listener default implementation */
public class TaskStatusListenerStub implements TaskStatusListener {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusListenerStub.class);

@Override
public void onTaskScheduled(TaskModel task) {
LOGGER.debug("Task {} is scheduled", task.getTaskId());
}

@Override
public void onTaskCanceled(TaskModel task) {
LOGGER.debug("Task {} is canceled", task.getTaskId());
}

@Override
public void onTaskCompleted(TaskModel task) {
LOGGER.debug("Task {} is completed", task.getTaskId());
}

@Override
public void onTaskCompletedWithErrors(TaskModel task) {
LOGGER.debug("Task {} is completed with errors", task.getTaskId());
}

@Override
public void onTaskFailed(TaskModel task) {
LOGGER.debug("Task {} is failed", task.getTaskId());
}

@Override
public void onTaskFailedWithTerminalError(TaskModel task) {
LOGGER.debug("Task {} is failed with terminal error", task.getTaskId());
}

@Override
public void onTaskInProgress(TaskModel task) {
LOGGER.debug("Task {} is in-progress", task.getTaskId());
}

@Override
public void onTaskSkipped(TaskModel task) {
LOGGER.debug("Task {} is skipped", task.getTaskId());
}

@Override
public void onTaskTimedOut(TaskModel task) {
LOGGER.debug("Task {} is timed out", task.getTaskId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
"type": "java.lang.String",
"description": "The implementation of the workflow status listener to be used."
},
{
"name": "conductor.task-status-listener.type",
"type": "java.lang.String",
"description": "The implementation of the task status listener to be used."
},
{
"name": "conductor.workflow-execution-lock.type",
"type": "java.lang.String",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.netflix.conductor.core.execution.evaluators.Evaluator;
import com.netflix.conductor.core.execution.mapper.*;
import com.netflix.conductor.core.execution.tasks.*;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.operation.StartWorkflowOperation;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class TestWorkflowExecutor {
private MetadataDAO metadataDAO;
private QueueDAO queueDAO;
private WorkflowStatusListener workflowStatusListener;
private TaskStatusListener taskStatusListener;
private ExecutionLockService executionLockService;
private ExternalPayloadStorageUtils externalPayloadStorageUtils;

Expand Down Expand Up @@ -160,6 +162,7 @@ public void init() {
metadataDAO = mock(MetadataDAO.class);
queueDAO = mock(QueueDAO.class);
workflowStatusListener = mock(WorkflowStatusListener.class);
taskStatusListener = mock(TaskStatusListener.class);
externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
executionLockService = mock(ExecutionLockService.class);
eventPublisher = mock(ApplicationEventPublisher.class);
Expand Down Expand Up @@ -210,6 +213,7 @@ public void init() {
queueDAO,
metadataMapperService,
workflowStatusListener,
taskStatusListener,
executionDAOFacade,
properties,
executionLockService,
Expand Down

0 comments on commit e68dee6

Please sign in to comment.