diff --git a/Conductor/Client/Constants.cs b/Conductor/Client/Constants.cs
index a2fa20b6..b4eda709 100644
--- a/Conductor/Client/Constants.cs
+++ b/Conductor/Client/Constants.cs
@@ -67,5 +67,9 @@ public static class Constants
public const string PINECONEENDPOINT = "PINECONE_ENDPOINT";
public const string PINECONEAPIKEY = "PINECONE_API_KEY";
public const string OPENAIAPIKEY = "OPENAI_API_KEY";
+
+ //Annotation
+ public const string TOKENCANCELLATION = "Token Requested Cancel";
+ public const string RUNTIMEERROR = "Method failed with runtime error";
}
}
\ No newline at end of file
diff --git a/Conductor/Client/Models/TaskResult.cs b/Conductor/Client/Models/TaskResult.cs
index 9eb10c1e..73351104 100644
--- a/Conductor/Client/Models/TaskResult.cs
+++ b/Conductor/Client/Models/TaskResult.cs
@@ -1,12 +1,11 @@
-using System.Linq;
-using System.IO;
+using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System;
-using System.Text;
using System.Collections.Generic;
-using System.Runtime.Serialization;
-using Newtonsoft.Json;
using System.ComponentModel.DataAnnotations;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Text;
namespace Conductor.Client.Models
{
@@ -152,6 +151,15 @@ public override string ToString()
return sb.ToString();
}
+ ///
+ /// Method to set the value for callbackAfterSeconds
+ ///
+ ///
+ public void SetCallbackAfterSeconds(long callbackAfterSeconds)
+ {
+ this.CallbackAfterSeconds = callbackAfterSeconds;
+ }
+
///
/// Returns the JSON string presentation of the object
///
diff --git a/Conductor/Client/Models/WorkflowTask.cs b/Conductor/Client/Models/WorkflowTask.cs
index fadde0c3..178c0353 100644
--- a/Conductor/Client/Models/WorkflowTask.cs
+++ b/Conductor/Client/Models/WorkflowTask.cs
@@ -755,5 +755,22 @@ public override int GetHashCode()
{
yield break;
}
+ ///
+ /// Sets theDynamicForkJoinTasksParam
+ ///
+ ///
+ public void SetDynamicForkJoinTasksParam(string dynamicForkTasksParam)
+ {
+ this.DynamicForkJoinTasksParam = dynamicForkTasksParam;
+ }
+
+ ///
+ /// Sets the dynamicForkTasksInputParamName
+ ///
+ ///
+ public void SetDynamicForkTasksInputParamName(string dynamicForkTasksInputParamName)
+ {
+ this.DynamicForkTasksInputParamName = dynamicForkTasksInputParamName;
+ }
}
}
diff --git a/Conductor/Client/Worker/GenericWorker.cs b/Conductor/Client/Worker/GenericWorker.cs
index 1627d837..7ed5cae5 100644
--- a/Conductor/Client/Worker/GenericWorker.cs
+++ b/Conductor/Client/Worker/GenericWorker.cs
@@ -1,6 +1,16 @@
+using Client.Worker.Utils;
using Conductor.Client.Interfaces;
using Conductor.Client.Models;
+using Conductor.Client.Worker.Utils;
+using Conductor.Definition.TaskType;
+using Definition.TaskType;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
using System.Reflection;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -8,33 +18,342 @@ namespace Conductor.Client.Worker
{
public class GenericWorker : IWorkflowTask
{
- public string TaskType { get; }
- public WorkflowTaskExecutorConfiguration WorkerSettings { get; }
+ public string TaskType { get; set; }
+ public WorkflowTaskExecutorConfiguration WorkerSettings { get; set; }
private readonly object _workerInstance;
private readonly MethodInfo _executeTaskMethod;
+ private readonly List failedStatuses = new List() { TaskResult.StatusEnum.FAILED, TaskResult.StatusEnum.FAILEDWITHTERMINALERROR };
+ private readonly JsonSerializerSettings _jsonSerializerSettings;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ ///
+ ///
public GenericWorker(string taskType, WorkflowTaskExecutorConfiguration workerSettings, MethodInfo executeTaskMethod, object workerInstance = null)
{
TaskType = taskType;
WorkerSettings = workerSettings;
_executeTaskMethod = executeTaskMethod;
_workerInstance = workerInstance;
+ _jsonSerializerSettings = ObjectMapperProvider.GetJsonSerializerSettings();
}
+ ///
+ /// Executes a task asynchronously and returns a TaskResult.
+ ///
+ ///
+ ///
+ ///
+ ///
public async Task Execute(Models.Task task, CancellationToken token)
{
-
if (token != CancellationToken.None && token.IsCancellationRequested)
- return new TaskResult() { Status = TaskResult.StatusEnum.FAILED, ReasonForIncompletion = "Token Requested Cancel" };
+ return new TaskResult() { Status = TaskResult.StatusEnum.FAILED, ReasonForIncompletion = Constants.TOKENCANCELLATION };
+
+ TaskResult result = null;
+ try
+ {
+ WorkflowTaskContext workflowTaskContext = WorkflowTaskContext.Set(task);
+ Object[] parameters = GetInvocationParameters(task);
+ Object invocationResult = await System.Threading.Tasks.Task.Run(() => _executeTaskMethod.Invoke(_workerInstance, parameters));
+ result = SetValue(invocationResult, workflowTaskContext.GetTaskResult());
+ if (!failedStatuses.Contains((TaskResult.StatusEnum)result.Status) && result.CallbackAfterSeconds > 0)
+ {
+ result.Status = TaskResult.StatusEnum.INPROGRESS;
+ }
+ }
+ catch (TargetInvocationException targetInvocationException)
+ {
+ if (result == null)
+ {
+ result = WorkerUtil.GetTaskResult(task);
+ }
+
+ Exception innerException = targetInvocationException.InnerException;
+ if (WorkerUtil.IsTerminalError(innerException))
+ {
+ result.Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR;
+ }
+ else
+ {
+ result.Status = TaskResult.StatusEnum.FAILED;
+ }
- var taskResult = await System.Threading.Tasks.Task.Run(() => _executeTaskMethod.Invoke(_workerInstance, new object[] { task }));
- return (TaskResult)taskResult;
+ StringBuilder stackTrace = new StringBuilder();
+ StackTrace exceptionStackTrace = new StackTrace(innerException);
+ foreach (StackFrame stackFrame in exceptionStackTrace.GetFrames())
+ {
+ string methodName = stackFrame.GetMethod().Name;
+ string fileName = stackFrame.GetFileName();
+ int lineNumber = stackFrame.GetFileLineNumber();
+ stackTrace.AppendLine($"Method: {methodName}, File: {fileName}, Line: {lineNumber}");
+ }
+ TaskExecLog taskExecLog = new TaskExecLog()
+ {
+ Log = stackTrace.ToString(),
+ TaskId = task.TaskId
+ };
+ result.Logs = new List { taskExecLog };
+ }
+ catch (Exception ex)
+ {
+ throw new Exception(Constants.RUNTIMEERROR);
+ }
+
+ return result;
}
public TaskResult Execute(Models.Task task)
{
throw new System.NotImplementedException();
}
+
+ ///
+ /// Returns the parameters required for the task invocation.
+ ///
+ ///
+ ///
+ private object[] GetInvocationParameters(Models.Task task)
+ {
+ Type[] parameterTypes = _executeTaskMethod.GetParameters().Select(p => p.ParameterType).ToArray();
+ ParameterInfo[] parameters = _executeTaskMethod.GetParameters();
+
+ if (parameterTypes.Length == 1 && parameterTypes[0] == typeof(Models.Task))
+ {
+ return new object[] { task };
+ }
+ else if (parameterTypes.Length == 1 && parameterTypes[0] == typeof(Dictionary))
+ {
+ return new object[] { task.InputData };
+ }
+
+ return GetParameters(task, parameterTypes, parameters);
+ }
+
+ ///
+ /// Retrieves parameters for task invocation based on task and method signatures.
+ ///
+ ///
+ ///
+ ///
+ ///
+ private object[] GetParameters(Models.Task task, Type[] parameterTypes, ParameterInfo[] parameters)
+ {
+ Attribute[][] parameterAnnotations = GetParameterAnnotations();
+ Object[] values = new Object[parameterTypes.Length];
+ for (int i = 0; i < parameterTypes.Length; i++)
+ {
+ Attribute[] annotations = parameterAnnotations[i];
+ if (annotations != null && annotations.Length > 0)
+ {
+ Type type = parameters[i].ParameterType;
+ Type parameterType = parameterTypes[i];
+ values[i] = GetInputValue(task, parameterType, type, annotations);
+ }
+ else
+ {
+ string parameterName = parameters[i].Name;
+ values[i] = ConvertInputValue(task.InputData[parameterName], parameterTypes[i]);
+ }
+ }
+ return values;
+ }
+
+ ///
+ /// Retrieves annotations for parameters of the method.
+ ///
+ ///
+ private Attribute[][] GetParameterAnnotations()
+ {
+ ParameterInfo[] parameters = _executeTaskMethod.GetParameters();
+ Attribute[][] parameterAnnotations = new Attribute[parameters.Length][];
+
+ for (int i = 0; i < parameters.Length; i++)
+ {
+ List annotations = new List();
+
+ foreach (object attribute in parameters[i].GetCustomAttributes(false))
+ {
+ if (attribute is Attribute attr)
+ {
+ annotations.Add(attr);
+ }
+ }
+
+ parameterAnnotations[i] = annotations.ToArray();
+ }
+
+ return parameterAnnotations;
+ }
+
+ ///
+ /// Retrieves the value for a parameter based on annotations and input data.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private Object GetInputValue(Models.Task task, Type parameterType, Type type, Attribute[] paramAnnotation)
+ {
+ InputParam inputParam = FindInputParamAnnotation(paramAnnotation);
+ if (inputParam == null)
+ {
+ ConvertInputValue(task.InputData, parameterType);
+ }
+
+ string inputValue = inputParam.Value;
+ object value;
+ if (task.InputData.ContainsKey(inputValue))
+ {
+ value = task.InputData[inputValue];
+ }
+ else
+ {
+ return null;
+ }
+
+ if (typeof(List