Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented workflows reload endpoint #5732

Merged
merged 5 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/modules/Elsa.Http/Handlers/InvalidateHttpWorkflowsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ namespace Elsa.Http.Handlers;
/// A handler that invalidates the HTTP workflows cache when a workflow definition is published, retracted, or deleted or when triggers are indexed.
/// </summary>
[UsedImplicitly]
public class InvalidateHttpWorkflowsCache(IHttpWorkflowsCacheManager httpWorkflowsCacheManager,
ITriggerStore triggerStore,
IHttpWorkflowsCacheManager cacheManager) :
public class InvalidateHttpWorkflowsCache(
IHttpWorkflowsCacheManager httpWorkflowsCacheManager,
ITriggerStore triggerStore) :
INotificationHandler<WorkflowDefinitionPublished>,
INotificationHandler<WorkflowDefinitionRetracted>,
INotificationHandler<WorkflowDefinitionVersionsUpdated>,
INotificationHandler<WorkflowDefinitionDeleted>,
INotificationHandler<WorkflowDefinitionDeleted>,
INotificationHandler<WorkflowDefinitionsDeleted>,
INotificationHandler<WorkflowDefinitionVersionDeleted>,
INotificationHandler<WorkflowDefinitionVersionsDeleted>,
INotificationHandler<WorkflowTriggersIndexed>
INotificationHandler<WorkflowTriggersIndexed>,
INotificationHandler<WorkflowDefinitionsReloaded>
{
/// <inheritdoc />
public Task HandleAsync(WorkflowDefinitionPublished notification, CancellationToken cancellationToken)
Expand Down Expand Up @@ -91,6 +92,15 @@ public async Task HandleAsync(WorkflowTriggersIndexed notification, Cancellation
await InvalidateCacheAsync(notification.IndexedWorkflowTriggers.Workflow.Identity.DefinitionId);
}

/// <inheritdoc />
public async Task HandleAsync(WorkflowDefinitionsReloaded notification, CancellationToken cancellationToken)
{
foreach (var workflowDefinitionId in notification.WorkflowDefinitionIds)
{
await InvalidateCacheAsync(workflowDefinitionId);
}
}

private async Task InvalidateCacheAsync(string workflowDefinitionId)
{
await httpWorkflowsCacheManager.EvictWorkflowAsync(workflowDefinitionId);
Expand All @@ -103,7 +113,7 @@ private async Task InvalidateTriggerCacheForDefinitionVersionAsync(string workfl
WorkflowDefinitionVersionId = workflowDefinitionVersionId
};
var triggers = await triggerStore.FindManyAsync(filter, cancellationToken);

await InvalidateTriggerCacheAsync(triggers, cancellationToken);
}

Expand All @@ -113,7 +123,7 @@ private async Task InvalidateTriggerCacheAsync(IEnumerable<StoredTrigger> trigge
{
if (trigger?.Payload is HttpEndpointBookmarkPayload httpPayload)
{
var hash = cacheManager.ComputeBookmarkHash(httpPayload.Path, httpPayload.Method);
var hash = httpWorkflowsCacheManager.ComputeBookmarkHash(httpPayload.Path, httpPayload.Method);
await httpWorkflowsCacheManager.EvictTriggerAsync(hash, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public class WorkflowDefinitionEventsConsumer(IWorkflowDefinitionActivityRegistr
global::MassTransit.IConsumer<WorkflowDefinitionVersionDeleted>,
global::MassTransit.IConsumer<WorkflowDefinitionVersionsDeleted>,
global::MassTransit.IConsumer<WorkflowDefinitionVersionsUpdated>,
global::MassTransit.IConsumer<WorkflowDefinitionsRefreshed>
global::MassTransit.IConsumer<WorkflowDefinitionsRefreshed>,
global::MassTransit.IConsumer<WorkflowDefinitionsReloaded>
{
/// <inheritdoc />
public Task Consume(ConsumeContext<WorkflowDefinitionDeleted> context)
Expand Down Expand Up @@ -82,9 +83,19 @@ public async Task Consume(ConsumeContext<WorkflowDefinitionsRefreshed> context)
{
var message = context.Message;
var notification = new Elsa.Workflows.Runtime.Notifications.WorkflowDefinitionsRefreshed(message.WorkflowDefinitionIds);
AmbientConsumerScope.IsConsumerExecutionContext = true;
AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer = true;
await notificationSender.SendAsync(notification, context.CancellationToken);
AmbientConsumerScope.IsConsumerExecutionContext = false;
AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer = false;
}

/// <inheritdoc />
public async Task Consume(ConsumeContext<WorkflowDefinitionsReloaded> context)
{
var message = context.Message;
var notification = new Elsa.Workflows.Runtime.Notifications.WorkflowDefinitionsReloaded(message.WorkflowDefinitionIds);
AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we need a new field here to prevent race conditions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the way AsyncLocal works, I don’t see how; each call will see a different instance of the AsyncLocal object.

await notificationSender.SendAsync(notification, context.CancellationToken);
AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer = false;
}

private Task UpdateDefinition(string id, bool usableAsActivity)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Elsa.MassTransit.Contracts;
using Elsa.MassTransit.Services;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Management.Notifications;
Expand All @@ -18,7 +17,8 @@ public class DistributedWorkflowDefinitionNotificationsHandler(IBus bus) :
INotificationHandler<WorkflowDefinitionVersionDeleted>,
INotificationHandler<WorkflowDefinitionVersionsDeleted>,
INotificationHandler<WorkflowDefinitionVersionsUpdated>,
INotificationHandler<WorkflowDefinitionsRefreshed>
INotificationHandler<WorkflowDefinitionsRefreshed>,
INotificationHandler<WorkflowDefinitionsReloaded>
{
/// <inheritdoc />
public Task HandleAsync(WorkflowDefinitionPublished notification, CancellationToken cancellationToken)
Expand Down Expand Up @@ -73,11 +73,23 @@ public async Task HandleAsync(WorkflowDefinitionVersionsUpdated notification, Ca
public Task HandleAsync(WorkflowDefinitionsRefreshed notification, CancellationToken cancellationToken)
{
// Prevent re-entrance.
if (AmbientConsumerScope.IsConsumerExecutionContext)
if (AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer)
return Task.CompletedTask;

var definitionIds = notification.WorkflowDefinitionIds;
var message = new Distributed.WorkflowDefinitionsRefreshed(definitionIds);
return bus.Publish(message, cancellationToken);
}

/// <inheritdoc />
public Task HandleAsync(WorkflowDefinitionsReloaded notification, CancellationToken cancellationToken)
{
// Prevent re-entrance.
if (AmbientConsumerScope.IsWorkflowDefinitionEventsConsumer)
return Task.CompletedTask;

var definitionIds = notification.WorkflowDefinitionIds;
var message = new Distributed.WorkflowDefinitionsReloaded(definitionIds);
return bus.Publish(message, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Elsa.MassTransit.Messages;

/// Represents a message that indicates that the specified workflow definitions have been reloaded.
public class WorkflowDefinitionsReloaded(ICollection<string> workflowDefinitionIds)
{
/// The workflow definition IDs that have been reloaded.
public ICollection<string> WorkflowDefinitionIds { get; set; } = workflowDefinitionIds;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public static class AmbientConsumerScope
private static readonly AsyncLocal<bool> IsRaisedFromConsumerState = new();

/// Gets or sets a value that indicates if the current code is initiated from a consumer.
public static bool IsConsumerExecutionContext
public static bool IsWorkflowDefinitionEventsConsumer
{
get => IsRaisedFromConsumerState.Value;
set => IsRaisedFromConsumerState.Value = value;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Elsa.Abstractions;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Requests;
using Elsa.Workflows.Runtime.Responses;
using JetBrains.Annotations;

namespace Elsa.Workflows.Api.Endpoints.WorkflowDefinitions.Refresh;
Expand All @@ -18,13 +19,13 @@ public override void Configure()

public override async Task HandleAsync(Request request, CancellationToken cancellationToken)
{
await RefreshWorkflowDefinitionsAsync(request.DefinitionIds, cancellationToken);
await SendOkAsync(cancellationToken);
var result = await RefreshWorkflowDefinitionsAsync(request.DefinitionIds, cancellationToken);
await SendOkAsync(new Response(result.Refreshed, result.NotFound), cancellationToken);
}

private async Task RefreshWorkflowDefinitionsAsync(ICollection<string>? definitionIds, CancellationToken cancellationToken)
private async Task<RefreshWorkflowDefinitionsResponse> RefreshWorkflowDefinitionsAsync(ICollection<string>? definitionIds, CancellationToken cancellationToken)
{
var request = new RefreshWorkflowDefinitionsRequest(definitionIds, BatchSize);
await workflowDefinitionsRefresher.RefreshWorkflowDefinitionsAsync(request, cancellationToken);
return await workflowDefinitionsRefresher.RefreshWorkflowDefinitionsAsync(request, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
using System.Text.Json.Serialization;

namespace Elsa.Workflows.Api.Endpoints.WorkflowDefinitions.Refresh;

public class Request
internal class Request
{
public ICollection<string>? DefinitionIds { get; set; }
}

internal class Response(ICollection<string> refreshed, ICollection<string> notFound)
{
[JsonPropertyName("refreshed")] public ICollection<string> Refreshed { get; } = refreshed;
[JsonPropertyName("notFound")] public ICollection<string> NotFound { get; } = notFound;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Elsa.Abstractions;
using Elsa.Workflows.Runtime.Contracts;
using JetBrains.Annotations;

namespace Elsa.Workflows.Api.Endpoints.WorkflowDefinitions.Reload;

[PublicAPI]
internal class Reload(IWorkflowDefinitionsReloader workflowDefinitionsReloader) : ElsaEndpointWithoutRequest
{
private const int BatchSize = 10;

public override void Configure()
{
Post("/actions/workflow-definitions/reload");
ConfigurePermissions("actions:workflow-definitions:reload");
}

public override async Task HandleAsync(CancellationToken cancellationToken)
{
await ReloadWorkflowDefinitionsAsync(cancellationToken);
await SendOkAsync(cancellationToken);
}

private async Task ReloadWorkflowDefinitionsAsync(CancellationToken cancellationToken)
{
await workflowDefinitionsReloader.ReloadWorkflowDefinitionsAsync(cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ public interface IWorkflowDefinitionStorePopulator
/// Populates the <see cref="IWorkflowDefinitionStore"/> with workflow definitions provided from <see cref="IWorkflowProvider"/> implementations.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
Task PopulateStoreAsync(CancellationToken cancellationToken = default);
Task<ICollection<string>> PopulateStoreAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Populates the <see cref="IWorkflowDefinitionStore"/> with workflow definitions provided from <see cref="IWorkflowProvider"/> implementations.
/// </summary>
/// <param name="indexTriggers">Whether to index triggers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
Task PopulateStoreAsync(bool indexTriggers, CancellationToken cancellationToken = default);
Task<ICollection<string>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancellationToken = default);

/// <summary>
/// Adds a workflow definition to the store.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Elsa.Workflows.Runtime.Contracts;

/// Reloads all workflows by re-invoking the populator.
public interface IWorkflowDefinitionsReloader
{
/// Reloads all workflows by re-invoking the populator.
Task ReloadWorkflowDefinitionsAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public override void Apply()
.AddScoped<IWorkflowDefinitionStorePopulator, DefaultWorkflowDefinitionStorePopulator>()
.AddScoped<IRegistriesPopulator, DefaultRegistriesPopulator>()
.AddScoped<IWorkflowDefinitionsRefresher, WorkflowDefinitionsRefresher>()
.AddScoped<IWorkflowDefinitionsReloader, WorkflowDefinitionsReloader>()
.AddScoped<IWorkflowRegistry, DefaultWorkflowRegistry>()
.AddScoped<ITaskReporter, TaskReporter>()
.AddScoped<SynchronousTaskDispatcher>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@ namespace Elsa.Workflows.Runtime.Handlers;
/// The cache is invalidated by calling the <c>TriggerTokenAsync</c> method of the <c>ICacheManager</c> passed to the class constructor.
/// </remarks>
[UsedImplicitly]
public class InvalidateTriggersCache(ICacheManager cacheManager) : INotificationHandler<WorkflowDefinitionsRefreshed>
public class InvalidateTriggersCache(ICacheManager cacheManager) :
INotificationHandler<WorkflowDefinitionsRefreshed>,
INotificationHandler<WorkflowDefinitionsReloaded>
{
/// <inheritdoc />
public Task HandleAsync(WorkflowDefinitionsRefreshed notification, CancellationToken cancellationToken)
{
return cacheManager.TriggerTokenAsync(CachingTriggerStore.CacheInvalidationTokenKey, cancellationToken).AsTask();
}

/// <inheritdoc />
public async Task HandleAsync(WorkflowDefinitionsReloaded notification, CancellationToken cancellationToken)
{
await cacheManager.TriggerTokenAsync(CachingTriggerStore.CacheInvalidationTokenKey, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Management.Contracts;
using Elsa.Workflows.Runtime.Notifications;
using JetBrains.Annotations;

namespace Elsa.Workflows.Runtime.Handlers;

/// <summary>
/// A notification handler that invalidates workflows cache when workflow definitions are reloaded.
/// </summary>
/// <remarks>
/// The class implements the <c>INotificationHandler</c> interface and is responsible for handling <c>WorkflowDefinitionsReloaded</c> notifications.
/// When a <c>WorkflowDefinitionsReloaded</c> notification is received, the <c>HandleAsync</c> method is called to invalidate the http definition cache.
/// </remarks>
[UsedImplicitly]
public class InvalidateWorkflowsCache(IWorkflowDefinitionCacheManager workflowDefinitionCacheManager) : INotificationHandler<WorkflowDefinitionsReloaded>
{
/// <inheritdoc />
public async Task HandleAsync(WorkflowDefinitionsReloaded notification, CancellationToken cancellationToken)
{
foreach (var workflowDefinitionId in notification.WorkflowDefinitionIds)
{
await workflowDefinitionCacheManager.EvictWorkflowDefinitionAsync(workflowDefinitionId, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Elsa.Mediator.Contracts;

namespace Elsa.Workflows.Runtime.Notifications;

/// Published when workflow definitions have been reloaded.
public record WorkflowDefinitionsReloaded(ICollection<string> WorkflowDefinitionIds) : INotification;
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
namespace Elsa.Workflows.Runtime.Responses;

/// Represents a response to a request to refresh workflow definitions.
public record RefreshWorkflowDefinitionsResponse(ICollection<WorkflowDefinition> WorkflowDefinitions);
public record RefreshWorkflowDefinitionsResponse(ICollection<string> Refreshed, ICollection<string> NotFound);
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,26 @@ public DefaultWorkflowDefinitionStorePopulator(
}

/// <inheritdoc />
public Task PopulateStoreAsync(CancellationToken cancellationToken = default)
public Task<ICollection<string>> PopulateStoreAsync(CancellationToken cancellationToken = default)
{
return PopulateStoreAsync(true, cancellationToken);
}

/// <inheritdoc />
public async Task PopulateStoreAsync(bool indexTriggers, CancellationToken cancellationToken = default)
public async Task<ICollection<string>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancellationToken = default)
{
var providers = _workflowDefinitionProviders();
var workflowDefinitionIds = new List<string>();
foreach (var provider in providers)
{
var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();

workflowDefinitionIds.AddRange(results.Select(w => w.Workflow.Id));

foreach (var result in results) await AddAsync(result, indexTriggers, cancellationToken);
}

return workflowDefinitionIds;
}

/// <inheritdoc />
Expand Down Expand Up @@ -131,8 +136,8 @@ private async Task AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflo
if (existingDefinitionVersion != null)
{
workflowDefinitionsToSave.Add(existingDefinitionVersion);
if(existingDefinitionVersion.Id != workflow.Identity.Id)

if (existingDefinitionVersion.Id != workflow.Identity.Id)
{
// It's possible that the imported workflow definition has a different ID than the existing one in the store.
// In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling for managing these, and other, discrepancies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ public async Task<RefreshWorkflowDefinitionsResponse> RefreshWorkflowDefinitions
await IndexWorkflowTriggersAsync(definitions.Items, cancellationToken);
processedWorkflowDefinitions.AddRange(definitions.Items);
currentPage++;

if (definitions.Items.Count < batchSize)
break;
}

var processedWorkflowDefinitionIds = processedWorkflowDefinitions.Select(x => x.Id).ToList();
var processedWorkflowDefinitionIds = processedWorkflowDefinitions.Select(x => x.DefinitionId).ToList();
var notification = new WorkflowDefinitionsRefreshed(processedWorkflowDefinitionIds);
await notificationSender.SendAsync(notification, cancellationToken);
return new RefreshWorkflowDefinitionsResponse(processedWorkflowDefinitions);
return new RefreshWorkflowDefinitionsResponse(processedWorkflowDefinitionIds, request.DefinitionIds?.Except(processedWorkflowDefinitionIds)?.ToList() ?? []);
}

private async Task IndexWorkflowTriggersAsync(IEnumerable<WorkflowDefinition> definitions, CancellationToken cancellationToken)
Expand Down
Loading
Loading