Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caching to workflow runtime and workflow management stores #5174

Merged
merged 63 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
854283d
Add caching to workflow runtime and workflow management stores
sfmskywalker Apr 2, 2024
bcd8c07
Refactor WorkflowsMiddleware for HTTP Endpoint bookmarks and triggers
sfmskywalker Apr 3, 2024
cd87898
Update HTTP endpoint authorization to use Workflow context
sfmskywalker Apr 3, 2024
0a1814c
Add FindAsync methods to trigger and bookmark stores
sfmskywalker Apr 3, 2024
28edb41
Refactor WorkflowsMiddleware for improved workflow handling
sfmskywalker Apr 3, 2024
92864f6
Add caching functionality to WorkflowsMiddleware
sfmskywalker Apr 3, 2024
6652ade
Implement dynamic cache duration for workflows
sfmskywalker Apr 3, 2024
168b36d
Add HttpWorkflowsCacheManager for caching HTTP workflows
sfmskywalker Apr 3, 2024
97b57f0
Refactor workflow trigger handling and caching
sfmskywalker Apr 3, 2024
f4553f2
Add summary to IndexedWorkflowTriggers
sfmskywalker Apr 3, 2024
4aff973
Refactor memory caching feature into separate module
sfmskywalker Apr 3, 2024
c90c94c
Add distributed caching and update async methods
sfmskywalker Apr 3, 2024
13b59cb
Add distributed caching with MassTransit support
sfmskywalker Apr 3, 2024
7c2ce36
Refactor queue naming and scope of MassTransitChangeTokenSignalPublisher
sfmskywalker Apr 3, 2024
6f1eebc
Add caching capabilities to workflow definition service
sfmskywalker Apr 3, 2024
3d2d065
Refactor caching mechanism in workflow definition
sfmskywalker Apr 3, 2024
d84d362
Merge remote-tracking branch 'origin/main' into issue/5135
sfmskywalker Apr 3, 2024
601509e
Update MassTransitBroker and enable RealTimeWorkflows and SignalRHubs
sfmskywalker Apr 3, 2024
ef25c50
Reformat variable types in HttpFeature
sfmskywalker Apr 3, 2024
f8af20c
Update HTTP workflows cache invalidation handler XML comment
sfmskywalker Apr 3, 2024
318dace
Remove unnecessary using directives
sfmskywalker Apr 3, 2024
9470834
Refactor HttpWorkflowsMiddleware constructor
sfmskywalker Apr 3, 2024
8d617af
Simplify workflow retrieval in HttpWorkflowsMiddleware
sfmskywalker Apr 3, 2024
1532025
Refactor workflow retrieval in HttpBookmarkProcessor
sfmskywalker Apr 3, 2024
9455da1
Optimize FindWorkflowAsync method in HttpWorkflowsCacheManager
sfmskywalker Apr 3, 2024
974dfa3
Refactor Endpoint.cs for workflow retrieval
sfmskywalker Apr 3, 2024
17772a2
Refactor InputFunctionsDefinitionProvider constructor
sfmskywalker Apr 3, 2024
5844111
Refactor WorkflowInstance with improved state handling
sfmskywalker Apr 3, 2024
d151fa8
Remove unused IBookmarkManager and update workflow functions
sfmskywalker Apr 3, 2024
0882bc7
Remove unused ReSharper directive
sfmskywalker Apr 3, 2024
df99515
Update activity invocation in workflow runtime
sfmskywalker Apr 3, 2024
cac88c5
Refactor code to simplify workflow definition loading
sfmskywalker Apr 3, 2024
a59bd49
Refactor WorkflowHostFactory to streamline workflow creation
sfmskywalker Apr 3, 2024
85feb33
Refactor workflow retrieval in WorkflowInstance.cs
sfmskywalker Apr 3, 2024
a3c9d1b
Remove unnecessary whitespace in WorkflowInstance.cs
sfmskywalker Apr 3, 2024
51ee271
Remove unnecessary comment in ProtoActorWorkflowRuntime
sfmskywalker Apr 3, 2024
a67cf7c
Update workflow management features and handlers
sfmskywalker Apr 4, 2024
175da9c
Add multiple log record support to workflow execution log stores
sfmskywalker Apr 4, 2024
3ef082f
Remove redundant workflow definition check
sfmskywalker Apr 4, 2024
ec372ca
Improve cancellation token usage in workflow execution
sfmskywalker Apr 5, 2024
324b5e5
Add PersistStateAsync method to WorkflowHost
sfmskywalker Apr 6, 2024
8ec925a
Refactor DefaultAlterationRunner service
sfmskywalker Apr 6, 2024
3af63cd
Refine wording in IWorkflowHost interface documentation
sfmskywalker Apr 6, 2024
613a1ad
Remove Redis from DistributedCachingTransport
sfmskywalker Apr 9, 2024
d8dcb1e
Remove 'useDistributedCaching' constant
sfmskywalker Apr 9, 2024
cdd826b
Update package tags in MassTransit project file
sfmskywalker Apr 9, 2024
b6927a2
Refactor distributed caching implementation
sfmskywalker Apr 9, 2024
7cbcd65
Refactor order of parameters in GetOrCreateAsync method
sfmskywalker Apr 9, 2024
2e1ea33
Refactor cache retrieval in Workflow service
sfmskywalker Apr 9, 2024
0899b2d
Update method descriptions and fix comments formatting
sfmskywalker Apr 9, 2024
2561b0a
Remove unused caching methods in ModuleExtensions
sfmskywalker Apr 9, 2024
4a82bdd
Remove redundant PrimaryKeyName in DapperWorkflowExecutionLogStore
sfmskywalker Apr 9, 2024
5827734
Refactor SaveAsync methods in Elsa.Dapper Store
sfmskywalker Apr 9, 2024
29dca47
Refactor store initialization in Elsa.Dapper modules
sfmskywalker Apr 9, 2024
8663670
Refactor UserStore in Elsa.Dapper module
sfmskywalker Apr 9, 2024
79a3f95
Refactor constructor arguments in MongoDb module
sfmskywalker Apr 9, 2024
9994e19
Fix comment syntax in IWorkflowInstanceStore
sfmskywalker Apr 9, 2024
5ad638b
Remove ComputeBookmarkHash from IHttpWorkflowsCacheManager
sfmskywalker Apr 9, 2024
be8301b
Add logging to HttpWorkflowsMiddleware
sfmskywalker Apr 9, 2024
1f30e01
Update consumer configuration in MassTransitFeature
sfmskywalker Apr 9, 2024
69af482
Change default MassTransitBroker to Memory
sfmskywalker Apr 9, 2024
e97e818
Remove Datadog.Trace package from Directory.Packages.props
sfmskywalker Apr 10, 2024
6731380
Merge remote-tracking branch 'origin/main' into issue/5135
sfmskywalker Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
<PackageVersion Include="Quartz.Serialization.Json" Version="3.8.0"/>
<PackageVersion Include="Refit" Version="7.0.0"/>
<PackageVersion Include="Refit.HttpClientFactory" Version="7.0.0"/>
<PackageVersion Include="Scrutor" Version="4.2.2"/>
<PackageVersion Include="ShortGuid" Version="2.0.1"/>
<PackageVersion Include="StackExchange.Redis" Version="2.7.4"/>
<PackageVersion Include="System.CommandLine" Version="2.0.0-beta4.22272.1"/>
Expand Down
17 changes: 17 additions & 0 deletions Elsa.sln
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Workflows.Runtime.Unit
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Server.LoadBalancer", "src\bundles\Elsa.Server.LoadBalancer\Elsa.Server.LoadBalancer.csproj", "{7C1E6DD9-C7DE-4686-AA73-D67A15D005A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Caching", "src\modules\Elsa.Caching\Elsa.Caching.csproj", "{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "caching", "caching", "{EB3A7401-0DE3-476F-9E6F-057F1F4590FB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Caching.Distributed.MassTransit", "src\modules\Elsa.Caching.Distributed.MassTransit\Elsa.Caching.Distributed.MassTransit.csproj", "{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -804,6 +810,14 @@ Global
{7C1E6DD9-C7DE-4686-AA73-D67A15D005A1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7C1E6DD9-C7DE-4686-AA73-D67A15D005A1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7C1E6DD9-C7DE-4686-AA73-D67A15D005A1}.Release|Any CPU.Build.0 = Release|Any CPU
{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16}.Release|Any CPU.Build.0 = Release|Any CPU
{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -945,6 +959,9 @@ Global
{086A7E2A-6CE7-41DA-927B-C033638060B1} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{B0946844-DE0E-4C59-9391-C0EB649DBF4E} = {18453B51-25EB-4317-A4B3-B10518252E92}
{7C1E6DD9-C7DE-4686-AA73-D67A15D005A1} = {F06B9573-DF68-4606-866C-A7546A10A05A}
{EB3A7401-0DE3-476F-9E6F-057F1F4590FB} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
{A1FE2CA0-8EF3-4EDB-9055-DD282E374E16} = {EB3A7401-0DE3-476F-9E6F-057F1F4590FB}
{BF4CDE28-F0CC-482D-8392-C1C9BB4C368F} = {EB3A7401-0DE3-476F-9E6F-057F1F4590FB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D4B5CEAA-7D70-4FCB-A68E-B03FBE5E0E5E}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class ParentWorkflow : WorkflowBase
{
protected override void Build(IWorkflowBuilder builder)
{
builder.WithDefinitionId("CustomId");
var childOutput = builder.WithVariable<IDictionary<string, object>>();

builder.Root = new Sequence
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/Elsa.Server.Web/Activities/SlowActivity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SlowActivity : CodeActivity, IActivityPropertyDefaultValueProvider
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var delay = Delay.Get(context);
await Task.Delay(delay);
await Task.Delay(delay, context.CancellationToken);
}

object IActivityPropertyDefaultValueProvider.GetDefaultValue(PropertyInfo property)
Expand Down
1 change: 1 addition & 0 deletions src/bundles/Elsa.Server.Web/Elsa.Server.Web.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\modules\Elsa.Caching.Distributed.MassTransit\Elsa.Caching.Distributed.MassTransit.csproj" />
<ProjectReference Include="..\..\modules\Elsa.EntityFrameworkCore.PostgreSql\Elsa.EntityFrameworkCore.PostgreSql.csproj"/>
<ProjectReference Include="..\..\modules\Elsa.MassTransit.AzureServiceBus\Elsa.MassTransit.AzureServiceBus.csproj"/>
<ProjectReference Include="..\Elsa\Elsa.csproj"/>
Expand Down
11 changes: 11 additions & 0 deletions src/bundles/Elsa.Server.Web/Enums/DistributedCachingTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Elsa.Server.Web;

/// <summary>
/// Represents the transport options for distributed caching.
/// </summary>
public enum DistributedCachingTransport
{
Memory,
Redis,
sfmskywalker marked this conversation as resolved.
Show resolved Hide resolved
MassTransit
}
25 changes: 20 additions & 5 deletions src/bundles/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
using Elsa.MongoDb.Modules.Runtime;
using Elsa.Server.Web;
using Elsa.Workflows.Management.Compression;
using Elsa.Workflows.Management.Services;
using Elsa.Workflows.Management.Stores;
using Elsa.Workflows.Runtime.Stores;
using Medallion.Threading.FileSystem;
using Medallion.Threading.Postgres;
Expand All @@ -39,9 +39,12 @@
const bool useQuartz = true;
const bool useMassTransit = true;
const bool useZipCompression = true;
const MassTransitBroker useMassTransitBroker = MassTransitBroker.Memory;
const bool runEFCoreMigrations = true;
const bool useMemoryStores = false;
const bool useMemoryStores = true;
const bool useCachingStores = true;
const bool useDistributedCaching = true;
const DistributedCachingTransport distributedCachingTransport = DistributedCachingTransport.MassTransit;
sfmskywalker marked this conversation as resolved.
Show resolved Hide resolved
const MassTransitBroker useMassTransitBroker = MassTransitBroker.Memory;

var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
Expand Down Expand Up @@ -149,6 +152,9 @@

if (useMassTransit)
management.UseMassTransitDispatcher();

if (useCachingStores)
management.UseCachingStores();
})
.UseWorkflowRuntime(runtime =>
{
Expand Down Expand Up @@ -183,9 +189,7 @@
}

if (useMassTransit)
{
runtime.UseMassTransitDispatcher();
}

runtime.WorkflowInboxCleanupOptions = options => configuration.GetSection("Runtime:WorkflowInboxCleanup").Bind(options);
runtime.WorkflowDispatcherOptions = options => configuration.GetSection("Runtime:WorkflowDispatcher").Bind(options);
Expand All @@ -197,6 +201,9 @@
runtime.WorkflowInboxStore = sp => sp.GetRequiredService<MemoryWorkflowInboxMessageStore>();
}

if (useCachingStores)
runtime.UseCachingStores();

runtime.DistributedLockProvider = _ =>
{
switch (distributedLockProviderName)
Expand Down Expand Up @@ -330,6 +337,14 @@
});
}

if (useDistributedCaching)
{
elsa.UseDistributedCache(distributedCaching =>
{
if (distributedCachingTransport == DistributedCachingTransport.MassTransit) distributedCaching.UseMassTransit();
});
}

elsa.InstallDropIns(options => options.DropInRootDirectory = Path.Combine(Directory.GetCurrentDirectory(), "App_Data", "DropIns"));
elsa.AddSwagger();
elsa.AddFastEndpointsAssembly<Program>();
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/Elsa.ServerAndStudio.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
identity.UseConfigurationBasedRoleProvider(options => identitySection.Bind(options));
})
.UseDefaultAuthentication()
.UseInstanceManagement(x => x.HeartbeatOptions = settings => heartbeatSection.Bind(settings))
.UseApplicationCluster(x => x.HeartbeatOptions = settings => heartbeatSection.Bind(settings))
.UseWorkflowManagement(management =>
{
if (useMassTransit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public static async Task<RunWorkflowResult> RunActivityAsync(this IServiceProvid
public static async Task<WorkflowDefinition> GetWorkflowDefinitionAsync(this IServiceProvider services, string workflowDefinitionId, VersionOptions versionOptions, CancellationToken cancellationToken = default)
{
var workflowDefinitionService = services.GetRequiredService<IWorkflowDefinitionService>();
var workflowDefinition = await workflowDefinitionService.FindAsync(workflowDefinitionId, versionOptions);
var workflowDefinition = await workflowDefinitionService.FindWorkflowDefinitionAsync(workflowDefinitionId, versionOptions);
return workflowDefinition!;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected override async ValueTask HandleAsync(AlterationContext context, Migrat
var definitionId = context.Workflow.Identity.DefinitionId;
var targetVersion = alteration.TargetVersion;
var cancellationToken = context.CancellationToken;
var targetWorkflowDefinition = await workflowDefinitionService.FindAsync(definitionId, VersionOptions.SpecificVersion(targetVersion), cancellationToken);
var targetWorkflowDefinition = await workflowDefinitionService.FindWorkflowDefinitionAsync(definitionId, VersionOptions.SpecificVersion(targetVersion), cancellationToken);

if (targetWorkflowDefinition == null)
{
Expand Down
11 changes: 3 additions & 8 deletions src/modules/Elsa.Alterations/Services/DefaultAlterationRunner.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
using Elsa.Alterations.Core.Contexts;
using Elsa.Alterations.Core.Contracts;
using Elsa.Alterations.Core.Models;
using Elsa.Alterations.Core.Results;
using Elsa.Alterations.Middleware.Workflows;
using Elsa.Common.Contracts;
using Elsa.Extensions;
using Elsa.Workflows;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Management.Contracts;
Expand Down Expand Up @@ -75,18 +73,15 @@ public async Task<RunAlterationsResult> RunAsync(string workflowInstanceId, IEnu
}

// Load workflow definition.
var workflowDefinition = await _workflowDefinitionService.FindAsync(workflowState.DefinitionVersionId, cancellationToken);
var workflow = await _workflowDefinitionService.FindWorkflowAsync(workflowState.DefinitionVersionId, cancellationToken);

// If the workflow definition is not found, log an error and continue.
if (workflowDefinition == null)
if (workflow == null)
{
log.Add($"Workflow definition with ID '{workflowState.DefinitionVersionId}' not found.", LogLevel.Error);
return result;
}

// Materialize workflow.
var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken);


// Create workflow execution context.
var workflowExecutionContext = await WorkflowExecutionContext.CreateAsync(_serviceProvider, workflow, workflowState, cancellationTokens: cancellationToken);
workflowExecutionContext.TransientProperties.Add(RunAlterationsMiddleware.AlterationsPropertyKey, alterations);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Elsa.Caching.Contracts;
using Elsa.Caching.Distributed.MassTransit.Messages;
using JetBrains.Annotations;
using MassTransit;

namespace Elsa.Caching.Distributed.MassTransit.Consumers;

/// <summary>
/// Consumes <see cref="TriggerChangeTokenSignal"/> messages and triggers the change token signal.
/// </summary>
[UsedImplicitly]
public class TriggerChangeTokenSignalConsumer(IDistributedChangeTokenSignaler changeTokenSignaler) : IConsumer<TriggerChangeTokenSignal>
{
/// <inheritdoc />
public async Task Consume(ConsumeContext<TriggerChangeTokenSignal> context)
{
var message = context.Message;
var cancellationToken = context.CancellationToken;
await changeTokenSignaler.TriggerTokenLocalAsync(message.Key, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>
Provides distributed caching services leveraging MassTransit for transport of signals.
</Description>
<PackageTags>elsa module distributed caching mass-transit</PackageTags>
sfmskywalker marked this conversation as resolved.
Show resolved Hide resolved
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\common\Elsa.Features\Elsa.Features.csproj" />
<ProjectReference Include="..\Elsa.Caching\Elsa.Caching.csproj" />
<ProjectReference Include="..\Elsa.MassTransit\Elsa.MassTransit.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Elsa.Caching.Distributed.MassTransit.Features;
using Elsa.Caching.Features;

// ReSharper disable once CheckNamespace
namespace Elsa.Extensions;

/// <summary>
/// Provides methods to install and configure the distributed caching feature with MassTransit.
/// </summary>
public static class ModuleExtensions
{
/// <summary>
/// Configures the distributed caching feature to use MassTransit.
/// </summary>
public static MassTransitDistributedCacheFeature UseMassTransit(this DistributedCacheFeature distributedCacheFeature, Action<MassTransitDistributedCacheFeature>? configure = default)
{
return distributedCacheFeature.Module.Configure(configure);
}

/// <summary>
/// Configures the memory caching feature with the distributed caching feature that uses MassTransit.
/// </summary>
public static MemoryCacheFeature UseMassTransit(this MemoryCacheFeature memoryCacheFeature, Action<MassTransitDistributedCacheFeature>? configure = default)
{
memoryCacheFeature.Module.Configure(configure);
return memoryCacheFeature;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Elsa.Caching.Distributed.MassTransit.Consumers;
using Elsa.Caching.Distributed.MassTransit.Services;
using Elsa.Caching.Features;
using Elsa.Extensions;
using Elsa.Features.Abstractions;
using Elsa.Features.Attributes;
using Elsa.Features.Services;
using Elsa.MassTransit.Features;
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Caching.Distributed.MassTransit.Features;

/// <summary>
/// Configures distributed cache management with MassTransit.
/// </summary>
[DependsOn(typeof(DistributedCacheFeature))]
[DependsOn(typeof(MassTransitFeature))]
public class MassTransitDistributedCacheFeature(IModule module) : FeatureBase(module)
{
/// <inheritdoc />
public override void Configure()
{
Module.AddMassTransitConsumer<TriggerChangeTokenSignalConsumer>("elsa-trigger-change-token-signal", true);
Module.Use<DistributedCacheFeature>(feature => feature.WithChangeTokenSignalPublisher(sp => sp.GetRequiredService<MassTransitChangeTokenSignalPublisher>()));
}

/// <inheritdoc />
public override void Apply()
{
Services.AddSingleton<MassTransitChangeTokenSignalPublisher>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait />
</Weavers>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Elsa.Caching.Distributed.MassTransit.Messages;

/// <summary>
/// Represents a message containing a signal to trigger a change token.
/// </summary>
/// <param name="Key">The key of the change token to trigger.</param>
public record TriggerChangeTokenSignal(string Key);
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Elsa.Caching.Contracts;
using Elsa.Caching.Distributed.MassTransit.Messages;
using MassTransit;

namespace Elsa.Caching.Distributed.MassTransit.Services;

/// <summary>
/// Represents a service that publishes change token signals using MassTransit.
/// </summary>
public class MassTransitChangeTokenSignalPublisher(IBus bus) : IChangeTokenSignalPublisher
{
/// <inheritdoc />
public async ValueTask PublishAsync(string key, CancellationToken cancellationToken = default)
{
var message = new TriggerChangeTokenSignal(key);
await bus.Publish(message, cancellationToken);
}
}
12 changes: 12 additions & 0 deletions src/modules/Elsa.Caching/Contracts/IChangeTokenSignalPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Elsa.Caching.Contracts;

/// <summary>
/// Represents a service that can publish change token signals.
/// </summary>
public interface IChangeTokenSignalPublisher
{
/// <summary>
/// Publishes a change token signal for the specified key.
/// </summary>
ValueTask PublishAsync(string key, CancellationToken cancellationToken = default);
}
19 changes: 19 additions & 0 deletions src/modules/Elsa.Caching/Contracts/IChangeTokenSignaler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.Extensions.Primitives;

namespace Elsa.Caching.Contracts;

/// <summary>
/// Provides change tokens for memory caches, allowing code to evict cache entries by triggering a signal.
/// </summary>
public interface IChangeTokenSignaler
{
/// <summary>
/// Gets a change token for the specified key.
/// </summary>
IChangeToken GetToken(string key);

/// <summary>
/// Triggers the change token for the specified key.
/// </summary>
ValueTask TriggerTokenAsync(string key, CancellationToken cancellationToken = default);
}
Loading
Loading