Skip to content

Commit

Permalink
V3 finish activity (#2963)
Browse files Browse the repository at this point in the history
* Incremental work on Finish activity

* Update OutputExpression.cs

* Update ExpressionHandlerRegistry.cs

* Update IActivity.cs

* Add sub-status

* Finish activity and update tests
  • Loading branch information
sfmskywalker authored Apr 25, 2022
1 parent c04f7c9 commit 53c60ad
Show file tree
Hide file tree
Showing 30 changed files with 501 additions and 173 deletions.
16 changes: 15 additions & 1 deletion src/api/Elsa.Api/Endpoints/WorkflowInstances/List.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using Elsa.Mediator.Services;
using Elsa.Models;
using Elsa.Persistence.Entities;
using Elsa.Persistence.Requests;
using Elsa.Serialization;
Expand All @@ -22,13 +23,26 @@ public static async Task<IResult> ListAsync(
[FromQuery] string? correlationId,
[FromQuery] int? version,
[FromQuery] WorkflowStatus? workflowStatus,
[FromQuery] WorkflowSubStatus? workflowSubStatus,
[FromQuery] OrderBy? orderBy,
[FromQuery] OrderDirection? orderDirection)
{
var serializerOptions = serializerOptionsProvider.CreateApiOptions();
var skip = page * pageSize;
var take = pageSize;
var request = new ListWorkflowInstanceSummaries(searchTerm, definitionId, version, correlationId, workflowStatus, orderBy ?? OrderBy.Created, orderDirection ?? OrderDirection.Ascending, skip ?? 0, take ?? 50);

var request = new ListWorkflowInstanceSummaries(
searchTerm,
definitionId,
version,
correlationId,
workflowStatus,
workflowSubStatus,
orderBy ?? OrderBy.Created,
orderDirection ?? OrderDirection.Ascending,
skip ?? 0,
take ?? 50);

var summaries = await requestSender.RequestAsync(request, cancellationToken);

return Results.Json(summaries, serializerOptions, statusCode: StatusCodes.Status200OK);
Expand Down
15 changes: 14 additions & 1 deletion src/core/Elsa.Core/Activities/Finish.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
using Elsa.Attributes;
using Elsa.Behaviors;
using Elsa.Models;

namespace Elsa.Activities;

[Activity("Elsa", "Control Flow", "Mark the workflow as Finished")]
public class Finish : Activity
{
// TODO.
public Finish()
{
// Don't let ancestor activities schedule additional work.
Behaviors.Remove<AutoCompleteBehavior>();
}

protected override void Execute(ActivityExecutionContext context)
{
context.ClearCompletionCallbacks();
context.WorkflowExecutionContext.Scheduler.Clear();
context.WorkflowExecutionContext.ClearBookmarks();
context.WorkflowExecutionContext.TransitionTo(WorkflowSubStatus.Finished);
}
}
2 changes: 1 addition & 1 deletion src/core/Elsa.Core/Expressions/OutputExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class OutputExpressionHandler : IExpressionHandler
public ValueTask<object?> EvaluateAsync(IExpression expression, Type returnType, ExpressionExecutionContext context)
{
var outputExpression = (OutputExpression)expression;
var output = outputExpression.Output;
var output = outputExpression.Output!;
var value = context.Get(output);
return ValueTask.FromResult(value);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Linq.Expressions;
using Elsa.Models;
using Elsa.Services;
using Elsa.Signals;
using Microsoft.Extensions.Logging;

namespace Elsa;
Expand Down Expand Up @@ -76,6 +77,79 @@ public static async Task<Input> EvaluateInputPropertyAsync(this ActivityExecutio

return input;
}

public static async Task<T?> EvaluateAsync<T>(this ActivityExecutionContext context, Input<T> input)
{
var evaluator = context.GetRequiredService<IExpressionEvaluator>();
var locationReference = input.LocationReference;
var value = await evaluator.EvaluateAsync(input, context.ExpressionExecutionContext);
locationReference.Set(context, value);
return value;
}

/// <summary>
/// Returns a flattened list of the current context's ancestors.
/// </summary>
/// <returns></returns>
public static IEnumerable<ActivityExecutionContext> GetAncestors(this ActivityExecutionContext context)
{
var current = context.ParentActivityExecutionContext;

while (current != null)
{
yield return current;
current = current.ParentActivityExecutionContext;
}
}

/// <summary>
/// Returns a flattened list of the current context's immediate children.
/// </summary>
/// <returns></returns>
public static IEnumerable<ActivityExecutionContext> GetChildren(this ActivityExecutionContext context) =>
context.WorkflowExecutionContext.ActivityExecutionContexts.Where(x => x.ParentActivityExecutionContext == context);

/// <summary>
/// Removes all child <see cref="ActivityExecutionContext"/> objects.
/// </summary>
public static void RemoveChildren(this ActivityExecutionContext context)
{
// Detach child activity execution contexts.
context.WorkflowExecutionContext.RemoveActivityExecutionContexts(context.GetChildren());
}

/// <summary>
/// Send a signal up the current branch.
/// </summary>
public static async ValueTask SignalAsync(this ActivityExecutionContext context, object signal)
{
var ancestorContexts = context.GetAncestors();

foreach (var ancestorContext in ancestorContexts)
{
var signalContext = new SignalContext(ancestorContext, context, context.CancellationToken);

if (ancestorContext.Activity is not ISignalHandler handler)
continue;

await handler.HandleSignalAsync(signal, signalContext);

if (signalContext.StopPropagationRequested)
return;
}
}

/// <summary>
/// Complete the current activity. This should only be called by activities that explicitly suppress automatic-completion.
/// </summary>
public static async ValueTask CompleteActivityAsync(this ActivityExecutionContext context)
{
// Send a signal.
await context.SignalAsync(new ActivityCompleted());

// Remove the context.
context.WorkflowExecutionContext.ActivityExecutionContexts.Remove(context);
}

public static ILogger GetLogger(this ActivityExecutionContext context) => (ILogger)context.GetRequiredService(typeof(ILogger<>).MakeGenericType(context.Activity.GetType()));
}
105 changes: 105 additions & 0 deletions src/core/Elsa.Core/Extensions/WorkflowExecutionContextExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using Elsa.Models;
using Elsa.Services;

namespace Elsa;

public static class WorkflowExecutionContextExtensions
{
/// <summary>
/// Remove the specified set of <see cref="ActivityExecutionContext"/> from the workflow execution context.
/// </summary>
public static void RemoveActivityExecutionContexts(this WorkflowExecutionContext workflowExecutionContext, IEnumerable<ActivityExecutionContext> contexts)
{
// Copy each item into a new list to avoid changing the source enumerable while removing elements from it.
var list = contexts.ToList();

// Remove each context.
foreach (var context in list) workflowExecutionContext.ActivityExecutionContexts.Remove(context);
}

/// <summary>
/// Schedules the root activity of the workflow.
/// </summary>
public static void ScheduleRoot(this WorkflowExecutionContext workflowExecutionContext)
{
var activityInvoker = workflowExecutionContext.GetRequiredService<IActivityInvoker>();
var workflow = workflowExecutionContext.Workflow;
var workItem = new ActivityWorkItem(workflow.Root.Id, async () => await activityInvoker.InvokeAsync(workflowExecutionContext, workflow.Root));
workflowExecutionContext.Scheduler.Push(workItem);
}

/// <summary>
/// Schedules the activity of the specified bookmark.
/// </summary>
public static void ScheduleBookmark(this WorkflowExecutionContext workflowExecutionContext, Bookmark bookmark)
{
// Construct bookmark.
var bookmarkedActivityContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == bookmark.ActivityInstanceId);
var bookmarkedActivity = bookmarkedActivityContext.Activity;

// Schedule the activity to resume.
var activityInvoker = workflowExecutionContext.GetRequiredService<IActivityInvoker>();
var workItem = new ActivityWorkItem(bookmarkedActivity.Id, async () => await activityInvoker.InvokeAsync(bookmarkedActivityContext));
workflowExecutionContext.Scheduler.Push(workItem);

// If no resumption point was specified, use Noop to prevent the regular "ExecuteAsync" method to be invoked.
workflowExecutionContext.ExecuteDelegate = bookmark.CallbackMethodName != null ? bookmarkedActivity.GetResumeActivityDelegate(bookmark.CallbackMethodName) : WorkflowExecutionContext.Complete;
}

/// <summary>
/// Schedules the specified activity.
/// </summary>
public static void Schedule(
this WorkflowExecutionContext workflowExecutionContext,
IActivity activity,
ActivityExecutionContext owner,
ActivityCompletionCallback? completionCallback = default,
IEnumerable<RegisterLocationReference>? locationReferences = default, object? tag = default)
{
var activityInvoker = workflowExecutionContext.GetRequiredService<IActivityInvoker>();
var workItem = new ActivityWorkItem(activity.Id, async () => await activityInvoker.InvokeAsync(workflowExecutionContext, activity, owner, locationReferences), tag);
workflowExecutionContext.Scheduler.Push(workItem);

if (completionCallback != null)
workflowExecutionContext.AddCompletionCallback(owner, activity, completionCallback);
}

/// <summary>
/// Gets the specified workflow variable by name.
/// </summary>
public static T? GetVariable<T>(this WorkflowExecutionContext workflowExecutionContext, string name) => (T?)workflowExecutionContext.GetVariable(name);

/// <summary>
/// Gets the specified workflow variable by name, where the name is implied by the type name.
/// </summary>
public static T? GetVariable<T>(this WorkflowExecutionContext workflowExecutionContext) => (T?)workflowExecutionContext.GetVariable(typeof(T).Name);

/// <summary>
/// Gets the specified workflow variable by name.
/// </summary>
public static object? GetVariable(this WorkflowExecutionContext workflowExecutionContext, string name)
{
var variable = workflowExecutionContext.Workflow.Variables.FirstOrDefault(x => x.Name == name);
return variable?.Get(workflowExecutionContext.Register);
}

/// <summary>
/// Sets the specified workflow variable by name, where the name is implied by the type name.
/// </summary>
public static Variable SetVariable<T>(this WorkflowExecutionContext workflowExecutionContext, T? value) => workflowExecutionContext.SetVariable(typeof(T).Name, value);

/// <summary>
/// Sets the specified workflow variable by name.
/// </summary>
public static Variable SetVariable<T>(this WorkflowExecutionContext workflowExecutionContext, string name, T? value) => workflowExecutionContext.SetVariable(name, (object?)value);

/// <summary>
/// Sets the specified workflow variable by name.
/// </summary>
public static Variable SetVariable(this WorkflowExecutionContext workflowExecutionContext, string name, object? value)
{
var variable = workflowExecutionContext.Workflow.Variables.FirstOrDefault(x => x.Name == name) ?? new Variable(name, value);
variable.Set(workflowExecutionContext.Register, value);
return variable;
}
}
1 change: 1 addition & 0 deletions src/core/Elsa.Core/Implementations/ActivityScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public class ActivityScheduler : IActivityScheduler
public void Push(ActivityWorkItem activity) => _stack.Push(activity);
public ActivityWorkItem Pop() => _stack.Pop();
public IEnumerable<ActivityWorkItem> List() => _stack.ToList();
public void Clear() => _stack.Clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ExpressionHandlerRegistry(IOptions<WorkflowEngineOptions> options, IServi
if (expressionType.IsConstructedGenericType)
expressionType = expressionType.BaseType;

if (!Dictionary.TryGetValue(expressionType, out var handlerType))
if (!Dictionary.TryGetValue(expressionType!, out var handlerType))
return null;

return (IExpressionHandler)ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, handlerType);
Expand Down
3 changes: 3 additions & 0 deletions src/core/Elsa.Core/Implementations/WorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public async Task<InvokeWorkflowResult> RunAsync(Workflow workflow, WorkflowStat

public async Task<InvokeWorkflowResult> RunAsync(WorkflowExecutionContext workflowExecutionContext)
{
// Transition into the Running state.
workflowExecutionContext.TransitionTo(WorkflowSubStatus.Executing);

// Execute the activity execution pipeline.
await _pipeline.ExecuteAsync(workflowExecutionContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public WorkflowState ReadState(WorkflowExecutionContext workflowExecutionContext
var state = new WorkflowState
{
Id = workflowExecutionContext.Id,
CorrelationId = workflowExecutionContext.CorrelationId
CorrelationId = workflowExecutionContext.CorrelationId,
Status = workflowExecutionContext.Status,
SubStatus = workflowExecutionContext.SubStatus
};

//GetOutput(state, workflowExecutionContext);
Expand All @@ -35,6 +37,7 @@ public void WriteState(WorkflowExecutionContext workflowExecutionContext, Workfl
{
workflowExecutionContext.Id = state.Id;
workflowExecutionContext.CorrelationId = state.CorrelationId;
workflowExecutionContext.SubStatus = state.SubStatus;
//SetOutput(state, workflowExecutionContext);
SetProperties(state, workflowExecutionContext);
SetActivityExecutionContexts(state, workflowExecutionContext);
Expand Down
Loading

0 comments on commit 53c60ad

Please sign in to comment.