From 9954f871c8f9d49306b555845045d08fe88c3430 Mon Sep 17 00:00:00 2001 From: "Chaithra.S" Date: Wed, 24 Apr 2024 13:03:15 +0530 Subject: [PATCH] Changes to support methods with worker attributes. --- Conductor/Client/Constants.cs | 4 + Conductor/Client/Models/TaskResult.cs | 18 +- Conductor/Client/Models/WorkflowTask.cs | 17 + Conductor/Client/Worker/GenericWorker.cs | 331 +++++++++++++++++- Conductor/Client/Worker/InputParam.cs | 30 ++ Conductor/Client/Worker/OutputParam.cs | 22 ++ .../Worker/Utils/ObjectMapperProvider.cs | 36 ++ Conductor/Client/Worker/Utils/WorkerUtil.cs | 74 ++++ Conductor/Client/Worker/WorkerTask.cs | 12 +- .../Client/Worker/WorkflowTaskContext.cs | 121 +++++++ Conductor/Definition/TaskType/DynamicFork.cs | 33 ++ .../Definition/TaskType/DynamicForkInput.cs | 17 + Conductor/Definition/TaskType/Task.cs | 59 ++++ Conductor/Examples/Utils/WorkerUtil.cs | 2 +- Tests/Helper/TestConstants.cs | 5 + Tests/Worker/AnnotatedWorker.cs | 30 ++ Tests/Worker/AnnotatedWorkerTest.cs | 143 ++++++++ Tests/Worker/TestWorkflows.cs | 2 +- 18 files changed, 941 insertions(+), 15 deletions(-) create mode 100644 Conductor/Client/Worker/InputParam.cs create mode 100644 Conductor/Client/Worker/OutputParam.cs create mode 100644 Conductor/Client/Worker/Utils/ObjectMapperProvider.cs create mode 100644 Conductor/Client/Worker/Utils/WorkerUtil.cs create mode 100644 Conductor/Client/Worker/WorkflowTaskContext.cs create mode 100644 Conductor/Definition/TaskType/DynamicFork.cs create mode 100644 Conductor/Definition/TaskType/DynamicForkInput.cs create mode 100644 Tests/Worker/AnnotatedWorker.cs create mode 100644 Tests/Worker/AnnotatedWorkerTest.cs diff --git a/Conductor/Client/Constants.cs b/Conductor/Client/Constants.cs index a2fa20b6..b4eda709 100644 --- a/Conductor/Client/Constants.cs +++ b/Conductor/Client/Constants.cs @@ -67,5 +67,9 @@ public static class Constants public const string PINECONEENDPOINT = "PINECONE_ENDPOINT"; public const string PINECONEAPIKEY = "PINECONE_API_KEY"; public const string OPENAIAPIKEY = "OPENAI_API_KEY"; + + //Annotation + public const string TOKENCANCELLATION = "Token Requested Cancel"; + public const string RUNTIMEERROR = "Method failed with runtime error"; } } \ No newline at end of file diff --git a/Conductor/Client/Models/TaskResult.cs b/Conductor/Client/Models/TaskResult.cs index 9eb10c1e..73351104 100644 --- a/Conductor/Client/Models/TaskResult.cs +++ b/Conductor/Client/Models/TaskResult.cs @@ -1,12 +1,11 @@ -using System.Linq; -using System.IO; +using Newtonsoft.Json; using Newtonsoft.Json.Converters; using System; -using System.Text; using System.Collections.Generic; -using System.Runtime.Serialization; -using Newtonsoft.Json; using System.ComponentModel.DataAnnotations; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; namespace Conductor.Client.Models { @@ -152,6 +151,15 @@ public override string ToString() return sb.ToString(); } + /// + /// Method to set the value for callbackAfterSeconds + /// + /// + public void SetCallbackAfterSeconds(long callbackAfterSeconds) + { + this.CallbackAfterSeconds = callbackAfterSeconds; + } + /// /// Returns the JSON string presentation of the object /// diff --git a/Conductor/Client/Models/WorkflowTask.cs b/Conductor/Client/Models/WorkflowTask.cs index fadde0c3..178c0353 100644 --- a/Conductor/Client/Models/WorkflowTask.cs +++ b/Conductor/Client/Models/WorkflowTask.cs @@ -755,5 +755,22 @@ public override int GetHashCode() { yield break; } + /// + /// Sets theDynamicForkJoinTasksParam + /// + /// + public void SetDynamicForkJoinTasksParam(string dynamicForkTasksParam) + { + this.DynamicForkJoinTasksParam = dynamicForkTasksParam; + } + + /// + /// Sets the dynamicForkTasksInputParamName + /// + /// + public void SetDynamicForkTasksInputParamName(string dynamicForkTasksInputParamName) + { + this.DynamicForkTasksInputParamName = dynamicForkTasksInputParamName; + } } } diff --git a/Conductor/Client/Worker/GenericWorker.cs b/Conductor/Client/Worker/GenericWorker.cs index 1627d837..7ed5cae5 100644 --- a/Conductor/Client/Worker/GenericWorker.cs +++ b/Conductor/Client/Worker/GenericWorker.cs @@ -1,6 +1,16 @@ +using Client.Worker.Utils; using Conductor.Client.Interfaces; using Conductor.Client.Models; +using Conductor.Client.Worker.Utils; +using Conductor.Definition.TaskType; +using Definition.TaskType; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; using System.Reflection; +using System.Text; using System.Threading; using System.Threading.Tasks; @@ -8,33 +18,342 @@ namespace Conductor.Client.Worker { public class GenericWorker : IWorkflowTask { - public string TaskType { get; } - public WorkflowTaskExecutorConfiguration WorkerSettings { get; } + public string TaskType { get; set; } + public WorkflowTaskExecutorConfiguration WorkerSettings { get; set; } private readonly object _workerInstance; private readonly MethodInfo _executeTaskMethod; + private readonly List failedStatuses = new List() { TaskResult.StatusEnum.FAILED, TaskResult.StatusEnum.FAILEDWITHTERMINALERROR }; + private readonly JsonSerializerSettings _jsonSerializerSettings; + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// public GenericWorker(string taskType, WorkflowTaskExecutorConfiguration workerSettings, MethodInfo executeTaskMethod, object workerInstance = null) { TaskType = taskType; WorkerSettings = workerSettings; _executeTaskMethod = executeTaskMethod; _workerInstance = workerInstance; + _jsonSerializerSettings = ObjectMapperProvider.GetJsonSerializerSettings(); } + /// + /// Executes a task asynchronously and returns a TaskResult. + /// + /// + /// + /// + /// public async Task Execute(Models.Task task, CancellationToken token) { - if (token != CancellationToken.None && token.IsCancellationRequested) - return new TaskResult() { Status = TaskResult.StatusEnum.FAILED, ReasonForIncompletion = "Token Requested Cancel" }; + return new TaskResult() { Status = TaskResult.StatusEnum.FAILED, ReasonForIncompletion = Constants.TOKENCANCELLATION }; + + TaskResult result = null; + try + { + WorkflowTaskContext workflowTaskContext = WorkflowTaskContext.Set(task); + Object[] parameters = GetInvocationParameters(task); + Object invocationResult = await System.Threading.Tasks.Task.Run(() => _executeTaskMethod.Invoke(_workerInstance, parameters)); + result = SetValue(invocationResult, workflowTaskContext.GetTaskResult()); + if (!failedStatuses.Contains((TaskResult.StatusEnum)result.Status) && result.CallbackAfterSeconds > 0) + { + result.Status = TaskResult.StatusEnum.INPROGRESS; + } + } + catch (TargetInvocationException targetInvocationException) + { + if (result == null) + { + result = WorkerUtil.GetTaskResult(task); + } + + Exception innerException = targetInvocationException.InnerException; + if (WorkerUtil.IsTerminalError(innerException)) + { + result.Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR; + } + else + { + result.Status = TaskResult.StatusEnum.FAILED; + } - var taskResult = await System.Threading.Tasks.Task.Run(() => _executeTaskMethod.Invoke(_workerInstance, new object[] { task })); - return (TaskResult)taskResult; + StringBuilder stackTrace = new StringBuilder(); + StackTrace exceptionStackTrace = new StackTrace(innerException); + foreach (StackFrame stackFrame in exceptionStackTrace.GetFrames()) + { + string methodName = stackFrame.GetMethod().Name; + string fileName = stackFrame.GetFileName(); + int lineNumber = stackFrame.GetFileLineNumber(); + stackTrace.AppendLine($"Method: {methodName}, File: {fileName}, Line: {lineNumber}"); + } + TaskExecLog taskExecLog = new TaskExecLog() + { + Log = stackTrace.ToString(), + TaskId = task.TaskId + }; + result.Logs = new List { taskExecLog }; + } + catch (Exception ex) + { + throw new Exception(Constants.RUNTIMEERROR); + } + + return result; } public TaskResult Execute(Models.Task task) { throw new System.NotImplementedException(); } + + /// + /// Returns the parameters required for the task invocation. + /// + /// + /// + private object[] GetInvocationParameters(Models.Task task) + { + Type[] parameterTypes = _executeTaskMethod.GetParameters().Select(p => p.ParameterType).ToArray(); + ParameterInfo[] parameters = _executeTaskMethod.GetParameters(); + + if (parameterTypes.Length == 1 && parameterTypes[0] == typeof(Models.Task)) + { + return new object[] { task }; + } + else if (parameterTypes.Length == 1 && parameterTypes[0] == typeof(Dictionary)) + { + return new object[] { task.InputData }; + } + + return GetParameters(task, parameterTypes, parameters); + } + + /// + /// Retrieves parameters for task invocation based on task and method signatures. + /// + /// + /// + /// + /// + private object[] GetParameters(Models.Task task, Type[] parameterTypes, ParameterInfo[] parameters) + { + Attribute[][] parameterAnnotations = GetParameterAnnotations(); + Object[] values = new Object[parameterTypes.Length]; + for (int i = 0; i < parameterTypes.Length; i++) + { + Attribute[] annotations = parameterAnnotations[i]; + if (annotations != null && annotations.Length > 0) + { + Type type = parameters[i].ParameterType; + Type parameterType = parameterTypes[i]; + values[i] = GetInputValue(task, parameterType, type, annotations); + } + else + { + string parameterName = parameters[i].Name; + values[i] = ConvertInputValue(task.InputData[parameterName], parameterTypes[i]); + } + } + return values; + } + + /// + /// Retrieves annotations for parameters of the method. + /// + /// + private Attribute[][] GetParameterAnnotations() + { + ParameterInfo[] parameters = _executeTaskMethod.GetParameters(); + Attribute[][] parameterAnnotations = new Attribute[parameters.Length][]; + + for (int i = 0; i < parameters.Length; i++) + { + List annotations = new List(); + + foreach (object attribute in parameters[i].GetCustomAttributes(false)) + { + if (attribute is Attribute attr) + { + annotations.Add(attr); + } + } + + parameterAnnotations[i] = annotations.ToArray(); + } + + return parameterAnnotations; + } + + /// + /// Retrieves the value for a parameter based on annotations and input data. + /// + /// + /// + /// + /// + /// + private Object GetInputValue(Models.Task task, Type parameterType, Type type, Attribute[] paramAnnotation) + { + InputParam inputParam = FindInputParamAnnotation(paramAnnotation); + if (inputParam == null) + { + ConvertInputValue(task.InputData, parameterType); + } + + string inputValue = inputParam.Value; + object value; + if (task.InputData.ContainsKey(inputValue)) + { + value = task.InputData[inputValue]; + } + else + { + return null; + } + + if (typeof(List).IsAssignableFrom(parameterType)) + { + List list = (List)ConvertInputValue(value, typeof(List)); + if (type.IsGenericType) + { + Type typeOfParameter = type.GetGenericArguments()[0]; + List parameterizedList = new List(); + foreach (var item in list) + { + parameterizedList.Add(ConvertInputValue(item, typeOfParameter)); + } + + return parameterizedList; + } + else + { + return list; + } + } + else + { + return ConvertInputValue(value, parameterType); + } + } + + /// + /// Converts input value to the specified target type. + /// + /// + /// + /// + private object ConvertInputValue(object input, Type targetType) + { + // Serialize the input object to JSON string using ObjectMapperProvider settings + string jsonString = JsonConvert.SerializeObject(input, _jsonSerializerSettings); + + // Deserialize the JSON string to the target type using ObjectMapperProvider settings + object result = JsonConvert.DeserializeObject(jsonString, targetType, _jsonSerializerSettings); + + return result; + } + + /// + /// Finds InputParam annotation in a list of parameter annotations. + /// + /// + /// + private static InputParam FindInputParamAnnotation(Attribute[] paramAnnotations) + { + return paramAnnotations + .OfType() + .FirstOrDefault(); + } + + /// + /// Sets the value in TaskResult based on invocation result and annotations. + /// + /// + /// + /// + private TaskResult SetValue(object InvocationResult, TaskResult result) + { + if (InvocationResult == null) + { + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + + OutputParam outputAnnotation = _executeTaskMethod.GetCustomAttribute(); + if (outputAnnotation != null) + { + string name = outputAnnotation.Value; + result.OutputData = new Dictionary() + { + { + name,InvocationResult + } + }; + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + + } + else if (InvocationResult is TaskResult) + { + return (TaskResult)InvocationResult; + } + else if (InvocationResult is Dictionary) + { + Dictionary ResultAsDictionary = (Dictionary)InvocationResult; + foreach (var kvp in ResultAsDictionary) + { + result.OutputData[kvp.Key] = kvp.Value; + } + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + else if (InvocationResult is string || InvocationResult is int + || InvocationResult is float || InvocationResult is decimal + || InvocationResult is double || InvocationResult is bool) + { + result.OutputData.Add("result", InvocationResult); + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + else if (InvocationResult is List) + { + List resultAsList = (List)ConvertInputValue(InvocationResult, typeof(List)); + result.OutputData.Add("result", resultAsList); + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + else if (InvocationResult is DynamicForkInput) + { + DynamicForkInput forkInput = (DynamicForkInput)InvocationResult; + List tasks = forkInput.Tasks; + List workflowTasks = new List(); + foreach (var sdkTasks in tasks) + { + List task = sdkTasks.GetWorkflowDefTasks(); + workflowTasks.AddRange(task); + } + result.OutputData.Add(DynamicFork.FORK_TASK_PARAM, workflowTasks); + result.OutputData.Add(DynamicFork.FORK_TASK_INPUT_PARAM, forkInput.Inputs); + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + else + { + Dictionary resultAsDictionary = InvocationResult as Dictionary; + foreach (var kvp in resultAsDictionary) + { + result.OutputData[kvp.Key] = kvp.Value; + } + result.Status = TaskResult.StatusEnum.COMPLETED; + return result; + } + } } } + diff --git a/Conductor/Client/Worker/InputParam.cs b/Conductor/Client/Worker/InputParam.cs new file mode 100644 index 00000000..b54c2e27 --- /dev/null +++ b/Conductor/Client/Worker/InputParam.cs @@ -0,0 +1,30 @@ +using System; + +namespace Conductor.Client.Worker +{ + [AttributeUsage(AttributeTargets.Method | AttributeTargets.Parameter)] + public class InputParam : Attribute + { + /// + /// Gets or sets the Value + /// + public string Value { get; set; } + + /// + /// Gets or sets the Required + /// + public bool Required { get; set; } + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public InputParam(string value, bool required = false) + { + Value = value; + Required = required; + + } + } +} diff --git a/Conductor/Client/Worker/OutputParam.cs b/Conductor/Client/Worker/OutputParam.cs new file mode 100644 index 00000000..70e5dcd0 --- /dev/null +++ b/Conductor/Client/Worker/OutputParam.cs @@ -0,0 +1,22 @@ +using System; + +namespace Conductor.Client.Worker +{ + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class OutputParam : Attribute + { + /// + /// Gets or sets the Value + /// + public string Value { get; } + + /// + /// Initializes a new instance of the class. + /// + /// + public OutputParam(string value) + { + Value = value; + } + } +} \ No newline at end of file diff --git a/Conductor/Client/Worker/Utils/ObjectMapperProvider.cs b/Conductor/Client/Worker/Utils/ObjectMapperProvider.cs new file mode 100644 index 00000000..73cdc7aa --- /dev/null +++ b/Conductor/Client/Worker/Utils/ObjectMapperProvider.cs @@ -0,0 +1,36 @@ +using Newtonsoft.Json; + +namespace Conductor.Client.Worker.Utils +{ + public class ObjectMapperProvider + { + /// + /// Method to set the serializer settings + /// + /// + public static JsonSerializerSettings GetJsonSerializerSettings() + { + var settings = new JsonSerializerSettings + { + // Disables throwing an exception if there are unknown properties during deserialization + MissingMemberHandling = MissingMemberHandling.Ignore, + + // Disables throwing an exception if there are ignored properties during deserialization + Error = (sender, args) => { args.ErrorContext.Handled = true; }, + + // Disables throwing an exception if null values are encountered for primitive types during deserialization + NullValueHandling = NullValueHandling.Include, + + // Disables throwing an exception if the object being serialized is empty + DefaultValueHandling = DefaultValueHandling.Ignore, + + // Sets serialization inclusion to always include properties, regardless of their values + ReferenceLoopHandling = ReferenceLoopHandling.Ignore, + PreserveReferencesHandling = PreserveReferencesHandling.None, + TypeNameHandling = TypeNameHandling.None + }; + + return settings; + } + } +} diff --git a/Conductor/Client/Worker/Utils/WorkerUtil.cs b/Conductor/Client/Worker/Utils/WorkerUtil.cs new file mode 100644 index 00000000..ec6e2c34 --- /dev/null +++ b/Conductor/Client/Worker/Utils/WorkerUtil.cs @@ -0,0 +1,74 @@ +using Conductor.Client.Models; +using System; + +namespace Client.Worker.Utils +{ + public static class WorkerUtil + { + public const string TerminalError = "terminal error"; + public const string Fatal = "fatal"; + + /// + /// Returns the taskResult object + /// + /// + /// + public static TaskResult GetTaskResult(Task task) + { + return new TaskResult(callbackAfterSeconds: task.CallbackAfterSeconds, taskId: task.TaskId, + externalOutputPayloadStoragePath: task.ExternalOutputPayloadStoragePath, workflowInstanceId: task.WorkflowInstanceId, outputData: task.OutputData, reasonForIncompletion: task.ReasonForIncompletion, + status: GetStatus(task), subWorkflowId: task.SubWorkflowId, workerId: task.WorkerId); + } + + /// + /// Returns the status based on task result + /// + /// + /// + /// + public static TaskResult.StatusEnum GetStatus(Task task) + { + TaskResult.StatusEnum status; + switch (task.Status) + { + case Task.StatusEnum.CANCELED: + case Task.StatusEnum.COMPLETEDWITHERRORS: + case Task.StatusEnum.TIMEDOUT: + case Task.StatusEnum.SKIPPED: + status = TaskResult.StatusEnum.FAILED; + break; + case Task.StatusEnum.SCHEDULED: + status = TaskResult.StatusEnum.INPROGRESS; + break; + default: + if (!Enum.TryParse(task.Status.ToString(), out status)) + { + throw new ArgumentException($"Unknown status: {task.Status}"); + } + break; + } + + return status; + } + + /// + /// Checks if the exception is a terminal error + /// + /// + /// + public static bool IsTerminalError(Exception exception) + { + if (exception.Message.Contains(TerminalError)) + { + return true; + } + + if (exception.StackTrace != null && exception.StackTrace.Contains(Fatal)) + { + return true; + } + + return false; + } + } +} diff --git a/Conductor/Client/Worker/WorkerTask.cs b/Conductor/Client/Worker/WorkerTask.cs index 4f0a6040..35a9bd29 100644 --- a/Conductor/Client/Worker/WorkerTask.cs +++ b/Conductor/Client/Worker/WorkerTask.cs @@ -5,7 +5,7 @@ namespace Conductor.Client.Worker [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] public class WorkerTask : Attribute { - public string TaskType { get; } + public string TaskType { get; set; } public WorkflowTaskExecutorConfiguration WorkerSettings { get; set; } public WorkerTask() @@ -13,7 +13,15 @@ public WorkerTask() WorkerSettings = new WorkflowTaskExecutorConfiguration(); } - public WorkerTask(string taskType, int batchSize, string domain, int pollIntervalMs, string workerId) + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// + /// + /// + public WorkerTask(string taskType = default, int batchSize = default, string domain = default, int pollIntervalMs = default, string workerId = default) { TaskType = taskType; WorkerSettings = new WorkflowTaskExecutorConfiguration diff --git a/Conductor/Client/Worker/WorkflowTaskContext.cs b/Conductor/Client/Worker/WorkflowTaskContext.cs new file mode 100644 index 00000000..8b85f1c4 --- /dev/null +++ b/Conductor/Client/Worker/WorkflowTaskContext.cs @@ -0,0 +1,121 @@ +using Client.Worker.Utils; +using Conductor.Client.Models; +using System.Threading; + +namespace Conductor.Client.Worker +{ + public class WorkflowTaskContext + { + private static readonly ThreadLocal TASK_CONTEXT_INHERITABLE_THREAD_LOCAL = + new ThreadLocal(() => null); + + private readonly Task task; + private readonly TaskResult taskResult; + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public WorkflowTaskContext(Task task, TaskResult taskResult) + { + this.task = task; + this.taskResult = taskResult; + } + + /// + /// Method to get the value of WorkflowTaskContext + /// + /// + public static WorkflowTaskContext Get() + { + return TASK_CONTEXT_INHERITABLE_THREAD_LOCAL.Value; + } + + /// + /// Method to set the WorkflowTaskContext + /// + /// + /// + public static WorkflowTaskContext Set(Task task) + { + TaskResult result = WorkerUtil.GetTaskResult(task); + WorkflowTaskContext context = new WorkflowTaskContext(task, result); + TASK_CONTEXT_INHERITABLE_THREAD_LOCAL.Value = context; + return context; + } + + /// + /// Method to get the WorkflowInstanceId + /// + /// + public string GetWorkflowInstanceId() + { + return task.WorkflowInstanceId; + } + + /// + /// Method to get the TaskId + /// + /// + public string GetTaskId() + { + return task.TaskId; + } + + /// + /// Method to get the RetryCount + /// + /// + public int? GetRetryCount() + { + return task.RetryCount; + } + + /// + /// Method to get the PollCount + /// + /// + public int? GetPollCount() + { + return task.PollCount; + } + + /// + /// Method to get the CallbackAfterSeconds value + /// + /// + public long? GetCallbackAfterSeconds() + { + return task.CallbackAfterSeconds; + } + + /// + /// Method to get the task + /// + /// + public Task GetTask() + { + return task; + } + + /// + /// Method to get the taskResult + /// + /// + public TaskResult GetTaskResult() + { + return taskResult; + } + + /// + /// Method to set the SetCallbackAfterSeconds variable + /// + /// + public void SetCallbackAfter(int seconds) + { + taskResult.SetCallbackAfterSeconds(seconds); + } + } +} + diff --git a/Conductor/Definition/TaskType/DynamicFork.cs b/Conductor/Definition/TaskType/DynamicFork.cs new file mode 100644 index 00000000..d05e301b --- /dev/null +++ b/Conductor/Definition/TaskType/DynamicFork.cs @@ -0,0 +1,33 @@ +using Conductor.Client.Models; +using Conductor.Definition.TaskType; +using Task = Conductor.Definition.TaskType.Task; + +namespace Definition.TaskType +{ + public class DynamicFork : Task + { + public string ForkTasksParameter { get; set; } + + public string ForkTasksInputsParameter { get; set; } + + public JoinTask Join { get; set; } + + public const string FORK_TASK_PARAM = "forkedTasks"; + public const string FORK_TASK_INPUT_PARAM = "forkedTasksInputs"; + + public DynamicFork(string taskReferenceName, string forkTasksParameter, string forkTasksInputsParameter) : base(taskReferenceName, WorkflowTaskTypeEnum.FORKJOINDYNAMIC) + { + this.Join = new JoinTask(taskReferenceName + "_join"); + this.ForkTasksParameter = forkTasksParameter; + this.ForkTasksInputsParameter = forkTasksInputsParameter; + base.InputParameters.Add(FORK_TASK_PARAM, forkTasksParameter); + base.InputParameters.Add(FORK_TASK_INPUT_PARAM, forkTasksInputsParameter); + } + + public override void UpdateWorkflowTask(WorkflowTask task) + { + task.SetDynamicForkJoinTasksParam("forkedTasks"); + task.SetDynamicForkTasksInputParamName("forkedTasksInputs"); + } + } +} \ No newline at end of file diff --git a/Conductor/Definition/TaskType/DynamicForkInput.cs b/Conductor/Definition/TaskType/DynamicForkInput.cs new file mode 100644 index 00000000..ebef048f --- /dev/null +++ b/Conductor/Definition/TaskType/DynamicForkInput.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; + +namespace Conductor.Definition.TaskType +{ + public class DynamicForkInput + { + public List Tasks { get; set; } + + public Dictionary Inputs { get; set; } + + public DynamicForkInput(List tasks, Dictionary inputs) + { + Tasks = tasks; + Inputs = inputs; + } + } +} \ No newline at end of file diff --git a/Conductor/Definition/TaskType/Task.cs b/Conductor/Definition/TaskType/Task.cs index e790b9fa..37a9a2fc 100644 --- a/Conductor/Definition/TaskType/Task.cs +++ b/Conductor/Definition/TaskType/Task.cs @@ -48,5 +48,64 @@ public string Output(string jsonPath = null) return "${" + $"{this.TaskReferenceName}.output.{jsonPath}" + "}"; } } + + /// + /// Returns work flow tasks + /// + /// + public List GetWorkflowDefTasks() + { + List workflowTasks = new List(); + workflowTasks.AddRange(GetParentTasks()); + workflowTasks.Add(ToWorkflowTask()); + workflowTasks.AddRange(GetChildrenTasks()); + return workflowTasks; + } + + /// + /// Returns a Work flow task + /// + /// + protected WorkflowTask ToWorkflowTask() + { + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.Name = Name; + workflowTask.Description = Description; + workflowTask.TaskReferenceName = TaskReferenceName; + workflowTask.WorkflowTaskType = WorkflowTaskType; + workflowTask.InputParameters = InputParameters; + workflowTask.StartDelay = StartDelay; + workflowTask.Optional = Optional; + + // Let the sub-classes enrich the workflow task before returning back + UpdateWorkflowTask(workflowTask); + return workflowTask; + } + + /// + /// Override this method when the sub-class should update the default WorkflowTask + /// + /// + public virtual void UpdateWorkflowTask(WorkflowTask workflowTask) { } + + /// + /// Override this method when sub-classes will generate multiple workflow tasks. Used by tasks + /// which have children tasks such as do_while, fork, etc. + /// + /// + protected virtual List GetChildrenTasks() + { + return new List(); + } + + /// + /// Override this method when sub-classes will generate multiple workflow tasks. Used by tasks + /// which have children tasks such as do_while, fork, etc. + /// + /// + protected virtual List GetParentTasks() + { + return new List(); + } } } diff --git a/Conductor/Examples/Utils/WorkerUtil.cs b/Conductor/Examples/Utils/WorkerUtil.cs index 85556e62..88953be1 100644 --- a/Conductor/Examples/Utils/WorkerUtil.cs +++ b/Conductor/Examples/Utils/WorkerUtil.cs @@ -20,7 +20,7 @@ public static async Task StartBackGroundTask(ManualResetEvent waitHandle, { var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information, workers); await host.StartAsync(); - Thread.Sleep(10000); + Thread.Sleep(20000); waitHandle.Set(); await host.StopAsync(); return true; diff --git a/Tests/Helper/TestConstants.cs b/Tests/Helper/TestConstants.cs index 869a2ca0..c992c65f 100644 --- a/Tests/Helper/TestConstants.cs +++ b/Tests/Helper/TestConstants.cs @@ -13,5 +13,10 @@ public static class TestConstants public const string DeleteIntegartionErrorMessage = "Failed to delete integration: {0}"; public const string CreateIntegartionErrorMessage = "Failed to create integration: {0}"; public const string DeserializingErrorMessage = "Error deserializing JSON: {0}"; + public const string QuoteTaskName = "test-quote-task"; + public const string Quote = "Mindset is everything"; + public const string TaskDescription = "Test description"; + public const string InputTaskName = "Test-input-task"; + public const string TestAddTask = "Test-add-task"; } } diff --git a/Tests/Worker/AnnotatedWorker.cs b/Tests/Worker/AnnotatedWorker.cs new file mode 100644 index 00000000..cf16112d --- /dev/null +++ b/Tests/Worker/AnnotatedWorker.cs @@ -0,0 +1,30 @@ +using Conductor.Client.Worker; +using conductor_csharp.test.Helper; + +namespace Tests.Worker +{ + [WorkerTask] + public class AnnotatedWorker + { + [WorkerTask(taskType: TestConstants.QuoteTaskName, workerId: "workerId", batchSize: 20, pollIntervalMs: 520)] + [OutputParam("outputValue")] + public string GetQuote() + { + return TestConstants.Quote; + } + + [WorkerTask(taskType: TestConstants.InputTaskName, workerId: "workerId", batchSize: 20, pollIntervalMs: 520)] + [OutputParam("outputValue")] + public string GetInputValue([InputParam("userId")] string userId, int otp) + { + return $"userId: {userId}@example.com and otp:{otp}"; + } + + [WorkerTask(taskType: TestConstants.TestAddTask, workerId: "workerId", batchSize: 20, pollIntervalMs: 520)] + [OutputParam("outputValue")] + public string AddValue([InputParam("numberOne")] int numberOne, [InputParam("numberTwo")] int numberTwo) + { + return $"final value : {numberOne + numberTwo}"; + } + } +} diff --git a/Tests/Worker/AnnotatedWorkerTest.cs b/Tests/Worker/AnnotatedWorkerTest.cs new file mode 100644 index 00000000..43581b20 --- /dev/null +++ b/Tests/Worker/AnnotatedWorkerTest.cs @@ -0,0 +1,143 @@ +using Conductor.Api; +using Conductor.Client; +using Conductor.Client.Extensions; +using Conductor.Client.Models; +using Conductor.Definition; +using Conductor.Definition.TaskType; +using Conductor.Executor; +using conductor_csharp.test.Helper; +using System.Collections.Generic; +using System.Threading; +using Xunit; + +namespace Tests.Worker +{ + public class AnnotatedWorkerTest + { + private readonly MetadataResourceApi _metaDataClient; + private readonly WorkflowExecutor _workflowExecutor; + + private const string WORKFLOW_NAME = "test-annotation"; + private const int WORKFLOW_VERSION = 1; + private const string WORKFLOW_DESC = "test-annotation-desc"; + + public AnnotatedWorkerTest() + { + _metaDataClient = ApiExtensions.GetClient(); + var config = new Configuration(); + _workflowExecutor = new WorkflowExecutor(config); + + //dev local testing + //var _orkesApiClient = new OrkesApiClient(config, new OrkesAuthenticationSettings(Constants.KEY_ID, Constants.KEY_SECRET)); + //_metaDataClient = _orkesApiClient.GetClient(); + } + + [Fact] + public void TestAnnotatedMethods() + { + var workflow = GetConductorWorkflow(); + TaskDef taskDef = new TaskDef() { Description = TestConstants.TaskDescription, Name = TestConstants.QuoteTaskName }; + var getQuoteTask = new SimpleTask(TestConstants.QuoteTaskName, TestConstants.QuoteTaskName); + getQuoteTask.Description = TestConstants.TaskDescription; + workflow.WithTask(getQuoteTask); + + RegisterAndStartWorkflow(workflow, new List { taskDef }); + } + + [Fact] + public void TestAnnotationWithInputParam() + { + var workflow = GetConductorWorkflow(); + TaskDef taskDef = new TaskDef() { Description = TestConstants.TaskDescription, Name = TestConstants.InputTaskName }; + var getInputValue = new SimpleTask(TestConstants.InputTaskName, TestConstants.InputTaskName).WithInput("userId", workflow.Input("userId")).WithInput("otp", workflow.Input("otp")); + getInputValue.Description = TestConstants.TaskDescription; + workflow.WithTask(getInputValue); + + var testInput = new Dictionary + { + { "userId", "Test" }, { "otp", 123 }, + }; + + RegisterAndStartWorkflow(workflow, new List { taskDef }, testInput); + } + + [Fact] + public void TestAnnotationWithMultipleInputParam() + { + var workflow = GetConductorWorkflow(); + TaskDef taskDef = new TaskDef() { Description = TestConstants.TaskDescription, Name = TestConstants.TestAddTask }; + var getSumValue = new SimpleTask(TestConstants.TestAddTask, TestConstants.TestAddTask).WithInput("numberOne", workflow.Input("numberOne")).WithInput("numberTwo", workflow.Input("numberTwo")); + getSumValue.Description = TestConstants.TaskDescription; + workflow.WithTask(getSumValue); + + var testInput = new Dictionary + { + { "numberOne", 7 }, { "numberTwo", 77 }, + }; + + RegisterAndStartWorkflow(workflow, new List { taskDef }, testInput); + } + + [Fact] + public void TestMultipleAnnotatedMethods() + { + var workflow = GetConductorWorkflow(); + List taskDefs = new List() { + new TaskDef() { Description = TestConstants.TaskDescription, Name = TestConstants.TestAddTask }, + new TaskDef() { Description = TestConstants.TaskDescription, Name = TestConstants.InputTaskName } + }; + var getInputValue = new SimpleTask(TestConstants.InputTaskName, TestConstants.InputTaskName).WithInput("userId", workflow.Input("userId")).WithInput("otp", workflow.Input("otp")); + getInputValue.Description = TestConstants.TaskDescription; + workflow.WithTask(getInputValue); + var getSumValue = new SimpleTask(TestConstants.TestAddTask, TestConstants.TestAddTask).WithInput("numberOne", workflow.Input("numberOne")).WithInput("numberTwo", workflow.Input("numberTwo")); + getSumValue.Description = TestConstants.TaskDescription; + workflow.WithTask(getSumValue); + + var testInput = new Dictionary + { + { "numberOne", 7 }, { "numberTwo", 77 }, + { "userId", "Test" }, { "otp", 123 } + }; + + RegisterAndStartWorkflow(workflow, taskDefs, testInput); + } + + /// + /// Returns a ConductorWorkflow object + /// + /// + private ConductorWorkflow GetConductorWorkflow() + { + return new ConductorWorkflow() + .WithName(WORKFLOW_NAME) + .WithVersion(WORKFLOW_VERSION) + .WithDescription(WORKFLOW_DESC); + } + + /// + /// Registers and starts a workflow + /// + /// + /// + /// + private void RegisterAndStartWorkflow(ConductorWorkflow workflow, List taskDefs, Dictionary inputData = default) + { + _metaDataClient.RegisterTaskDef(taskDefs); + _workflowExecutor.RegisterWorkflow(workflow, true); + + StartWorkflowRequest startWorkflow = new StartWorkflowRequest() + { + Name = workflow.Name, + Input = inputData, + Version = workflow.Version, + WorkflowDef = workflow, + CreatedBy = Constants.OWNER_EMAIL + }; + + _workflowExecutor.StartWorkflow(startWorkflow); + var waitHandle = new ManualResetEvent(false); + var backgroundTask = System.Threading.Tasks.Task.Run(async () => await Conductor.Examples.Utils.WorkerUtil.StartBackGroundTask(waitHandle)); + waitHandle.WaitOne(); + } + } +} diff --git a/Tests/Worker/TestWorkflows.cs b/Tests/Worker/TestWorkflows.cs index 3b3aeff5..afdc6e56 100644 --- a/Tests/Worker/TestWorkflows.cs +++ b/Tests/Worker/TestWorkflows.cs @@ -11,7 +11,7 @@ using System.Collections.Generic; using Xunit; -namespace conductor_csharp.test.Worker +namespace Test.Worker { public class TestWorkflows {