Skip to content

Commit

Permalink
Refactor proto actor runtime with snapshot support (#3283)
Browse files Browse the repository at this point in the history
* Initial working grain

* Incremental work on actor bookmark management & resuming wokflows

* Upgrade Proto Actor package references

* Fix lifetime of actors

* Implement bookmark and resumption

* Update Program.cs

* Update bookmark event publishing

* Refactor proto actor features API

* Implement snapshotting

* Update snapshotting

* Add in-memory provider and use it by default

* Update server bundle to use proto actor sqlite persistence by default
  • Loading branch information
sfmskywalker authored Sep 2, 2022
1 parent 6f5a029 commit e2ffafc
Show file tree
Hide file tree
Showing 98 changed files with 1,510 additions and 1,487 deletions.
2 changes: 1 addition & 1 deletion common.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<LangVersion>latest</LangVersion>
<Authors>Elsa Contributors</Authors>
<Authors>Elsa Workflows Team</Authors>
<Copyright>2022</Copyright>
<PackageProjectUrl>https://github.com/elsa-workflows/elsa-core</PackageProjectUrl>
<RepositoryUrl>https://github.com/elsa-workflows/elsa-core</RepositoryUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@
<ProjectReference Include="..\..\modules\Elsa.Workflows.Persistence.EntityFrameworkCore\Elsa.Workflows.Persistence.EntityFrameworkCore.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Proto.Persistence.Sqlite" Version="0.33.0" />
</ItemGroup>

</Project>
10 changes: 7 additions & 3 deletions src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Elsa.Labels.EntityFrameworkCore.Sqlite;
using Elsa.Labels.Extensions;
using Elsa.Liquid.Extensions;
using Elsa.ProtoActor.Extensions;
using Elsa.Scheduling.Extensions;
using Elsa.WorkflowContexts.Extensions;
using Elsa.Workflows.Api.Extensions;
Expand All @@ -31,10 +32,14 @@
using FastEndpoints;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Data.Sqlite;
using Proto.Persistence.Sqlite;
using Event = Elsa.Workflows.Core.Activities.Event;

var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
var configuration = builder.Configuration;
var dbConnectionString = configuration.GetConnectionString("Sqlite");
var identityOptions = new IdentityOptions();
var identitySection = configuration.GetSection("Identity");
identitySection.Bind(identityOptions);
Expand Down Expand Up @@ -63,9 +68,10 @@
identity.CreateDefaultUser = true;
identity.IdentityOptions = options => identitySection.Bind(options);
})
.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(dbConnectionString))))
.UseJobActivities()
.UseScheduling()
.UseWorkflowPersistence(p => p.UseEntityFrameworkCore(ef => ef.UseSqlite()))
.UseWorkflowPersistence(p => p.UseEntityFrameworkCore(ef => ef.UseSqlite(dbConnectionString)))
.UseWorkflowApiEndpoints()
.UseJavaScript()
.UseLiquid()
Expand Down Expand Up @@ -103,8 +109,6 @@
serviceProvider.ConfigureDefaultWorkflowExecutionPipeline(pipeline =>
pipeline
.UseWorkflowExecutionEvents()
.UseWorkflowExecutionLogPersistence()
.UsePersistence()
.UseWorkflowContexts()
.UseStackBasedActivityScheduler()
);
Expand Down
3 changes: 3 additions & 0 deletions src/bundles/Elsa.WorkflowServer.Web/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
}
},
"AllowedHosts": "*",
"ConnectionStrings": {
"Sqlite": "Data Source=elsa.sqlite.db;Cache=Shared;"
},
"Identity": {
"SigningKey": "secret-signing-key"
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/Elsa.Api.Common/Abstractions/Endpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ protected void ConfigurePermissions(params string[] permissions)
if (!EndpointSecurityOptions.SecurityIsEnabled)
AllowAnonymous();
else
Permissions((new[] { PermissionNames.All }).Concat(permissions).ToArray());
Permissions(new[] { PermissionNames.All }.Concat(permissions).ToArray());
}
}

Expand All @@ -20,7 +20,7 @@ protected void ConfigurePermissions(params string[] permissions)
if (!EndpointSecurityOptions.SecurityIsEnabled)
AllowAnonymous();
else
Permissions((new[] { PermissionNames.All }).Concat(permissions).ToArray());
Permissions(new[] { PermissionNames.All }.Concat(permissions).ToArray());
}
}

Expand All @@ -31,7 +31,7 @@ protected void ConfigurePermissions(params string[] permissions)
if (!EndpointSecurityOptions.SecurityIsEnabled)
AllowAnonymous();
else
Permissions((new[] { PermissionNames.All }).Concat(permissions).ToArray());
Permissions(new[] { PermissionNames.All }.Concat(permissions).ToArray());
}
}

Expand All @@ -42,6 +42,6 @@ protected void ConfigurePermissions(params string[] permissions)
if (!EndpointSecurityOptions.SecurityIsEnabled)
AllowAnonymous();
else
Permissions((new[] { PermissionNames.All }).Concat(permissions).ToArray());
Permissions(new[] { PermissionNames.All }.Concat(permissions).ToArray());
}
}
18 changes: 11 additions & 7 deletions src/designer/elsa-workflows-designer/src/modules/login/plugin.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import {StudioService, AuthContext, EventBus} from "../../services";

@Service()
export class LoginPlugin implements Plugin {
private eventBus: EventBus;
private studioService: StudioService;
private readonly eventBus: EventBus;
private readonly studioService: StudioService;

constructor() {
this.eventBus = Container.get(EventBus);
this.studioService = Container.get(StudioService);
this.eventBus.on(EventTypes.HttpClient.ClientCreated, this.onHttpClientCreated);
this.eventBus.on(EventTypes.HttpClient.Unauthorized, this.onUnauthorized)
}

async initialize(): Promise<void> {
Expand All @@ -27,6 +26,7 @@ export class LoginPlugin implements Plugin {

private onHttpClientCreated = async (e) => {
const service: MiddlewareService = e.service;
const studioService = this.studioService;

service.register({
async onRequest(request) {
Expand All @@ -37,12 +37,16 @@ export class LoginPlugin implements Plugin {
request.headers = {...request.headers, 'Authorization': `Bearer ${token}`};

return request;
},

async onResponseError(error) {
debugger;
if (error.response.status !== 401)
return;

studioService.show(() => <elsa-login-page/>);
}
});
};

private onUnauthorized = async () => {
this.studioService.show(() => <elsa-login-page/>);
};

}
9 changes: 4 additions & 5 deletions src/modules/Elsa.AzureServiceBus/Implementations/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ public class Worker : IAsyncDisposable
private static readonly string BookmarkName = TypeNameHelper.GenerateTypeName<MessageReceived>();
private readonly ServiceBusProcessor _processor;
private readonly IHasher _hasher;
private readonly IWorkflowService _workflowService;
//private readonly IWorkflowService _workflowService;
private readonly ILogger _logger;
private int _refCount = 1;

public Worker(string queueOrTopic, string? subscription, ServiceBusClient client, IHasher hasher, IWorkflowService workflowService, ILogger<Worker> logger)
public Worker(string queueOrTopic, string? subscription, ServiceBusClient client, IHasher hasher, ILogger<Worker> logger)
{
QueueOrTopic = queueOrTopic;
Subscription = subscription == "" ? default : subscription;
_hasher = hasher;
_workflowService = workflowService;
_logger = logger;

var options = new ServiceBusProcessorOptions();
Expand Down Expand Up @@ -73,9 +72,9 @@ private async Task InvokeWorkflowsAsync(ServiceBusReceivedMessage message, Cance
var correlationId = message.CorrelationId;
var messageModel = CreateMessageModel(message);
var input = new Dictionary<string, object> { [MessageReceived.InputKey] = messageModel };
var executionResults = (await _workflowService.DispatchStimulusAsync(BookmarkName, payload, input, correlationId, cancellationToken)).ToList();
//var executionResults = (await _workflowService.DispatchStimulusAsync(BookmarkName, payload, input, correlationId, cancellationToken)).ToList();

_logger.LogInformation("Triggered {WorkflowCount} workflows", executionResults.Count);
//_logger.LogInformation("Triggered {WorkflowCount} workflows", executionResults.Count);
}

private ReceivedServiceBusMessageModel CreateMessageModel(ServiceBusReceivedMessage message) =>
Expand Down
9 changes: 5 additions & 4 deletions src/modules/Elsa.Http/Extensions/RouteTableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Elsa.Http.Models;
using Elsa.Http.Services;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Persistence.Entities;

namespace Elsa.Http.Extensions;
Expand All @@ -17,7 +18,7 @@ public static void AddRoutes(this IRouteTable routeTable, IEnumerable<WorkflowTr
routeTable.AddRange(paths);
}

public static void AddRoutes(this IRouteTable routeTable, IEnumerable<WorkflowBookmark> bookmarks)
public static void AddRoutes(this IRouteTable routeTable, IEnumerable<Bookmark> bookmarks)
{
var paths = Filter(bookmarks).Select(Deserialize).Select(x => x.Path).ToList();
routeTable.AddRange(paths);
Expand All @@ -29,15 +30,15 @@ public static void RemoveRoutes(this IRouteTable routeTable, IEnumerable<Workflo
routeTable.RemoveRange(paths);
}

public static void RemoveRoutes(this IRouteTable routeTable, IEnumerable<WorkflowBookmark> bookmarks)
public static void RemoveRoutes(this IRouteTable routeTable, IEnumerable<Bookmark> bookmarks)
{
var paths = Filter(bookmarks).Select(Deserialize).Select(x => x.Path).ToList();
routeTable.RemoveRange(paths);
}

private static IEnumerable<WorkflowTrigger> Filter(IEnumerable<WorkflowTrigger> triggers) => triggers.Where(x => x.Name == ActivityTypeNameHelper.GenerateTypeName<HttpEndpoint>());
private static IEnumerable<WorkflowBookmark> Filter(IEnumerable<WorkflowBookmark> triggers) => triggers.Where(x => x.Name == ActivityTypeNameHelper.GenerateTypeName<HttpEndpoint>());
private static IEnumerable<Bookmark> Filter(IEnumerable<Bookmark> triggers) => triggers.Where(x => x.Name == ActivityTypeNameHelper.GenerateTypeName<HttpEndpoint>());
private static HttpEndpointBookmarkData Deserialize(WorkflowTrigger trigger) => Deserialize(trigger.Data!);
private static HttpEndpointBookmarkData Deserialize(WorkflowBookmark bookmark) => Deserialize(bookmark.Data!);
private static HttpEndpointBookmarkData Deserialize(Bookmark bookmark) => Deserialize(bookmark.Data!);
private static HttpEndpointBookmarkData Deserialize(string model) => JsonSerializer.Deserialize<HttpEndpointBookmarkData>(model)!;
}
22 changes: 11 additions & 11 deletions src/modules/Elsa.Http/Middleware/HttpTriggerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public HttpTriggerMiddleware(RequestDelegate next, IHasher hasher, IOptions<Http
_options = options.Value;
}

public async Task InvokeAsync(HttpContext httpContext, IWorkflowService workflowService, IRouteMatcher routeMatcher)
public async Task InvokeAsync(HttpContext httpContext, IRouteMatcher routeMatcher)
{
var path = GetPath(httpContext);
var basePath = _options.BasePath;
Expand Down Expand Up @@ -66,16 +66,16 @@ public async Task InvokeAsync(HttpContext httpContext, IWorkflowService workflow
);

var input = new Dictionary<string, object>() { [HttpEndpoint.InputKey] = requestModel };
var stimulus = Stimulus.Standard<HttpEndpoint>(hash, input);
var executionResults = (await workflowService.ExecuteStimulusAsync(stimulus, abortToken)).ToList();

if (!executionResults.Any())
{
await _next(httpContext);
return;
}

await WriteResponseAsync(httpContext, executionResults, abortToken);
// var stimulus = Stimulus.Standard<HttpEndpoint>(hash, input);
// var executionResults = (await workflowService.ExecuteStimulusAsync(stimulus, abortToken)).ToList();
//
// if (!executionResults.Any())
// {
// await _next(httpContext);
// return;
// }

//await WriteResponseAsync(httpContext, executionResults, abortToken);
}

private static async Task WriteResponseAsync(HttpContext httpContext, IEnumerable<ExecuteWorkflowInstructionResult> executionResults, CancellationToken cancellationToken)
Expand Down
14 changes: 7 additions & 7 deletions src/modules/Elsa.Jobs.Activities/Handlers/JobExecutedHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ namespace Elsa.Jobs.Activities.Handlers;

public class JobExecutedHandler : INotificationHandler<JobExecuted>
{
private readonly IWorkflowService _workflowService;

public JobExecutedHandler(IWorkflowService workflowService)
{
_workflowService = workflowService;
}
// private readonly IWorkflowService _workflowService;
//
// public JobExecutedHandler(IWorkflowService workflowService)
// {
// _workflowService = workflowService;
// }

public async Task HandleAsync(JobExecuted notification, CancellationToken cancellationToken)
{
var payload = new EnqueuedJobPayload(notification.Job.Id);
var jobType = notification.Job.GetType();
var jobTypeName = JobTypeNameHelper.GenerateTypeName(jobType);
var bookmarkName = jobTypeName;
await _workflowService.DispatchStimulusAsync(bookmarkName, payload, cancellationToken: cancellationToken);
//await _workflowService.DispatchStimulusAsync(bookmarkName, payload, cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public MassTransitDispatchersFeature(IModule module) : base(module)

public override void Configure()
{
Module.Configure<WorkflowRuntimeFeature>(f => f.WorkflowDispatcherFactory = ActivatorUtilities.GetServiceOrCreateInstance<MassTransitWorkflowDispatcher>);
Module.Configure<WorkflowRuntimeFeature>(f => f.WorkflowDispatcher = ActivatorUtilities.GetServiceOrCreateInstance<MassTransitWorkflowDispatcher>);
}
}
28 changes: 16 additions & 12 deletions src/modules/Elsa.ProtoActor.Common/ProtoActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ namespace Elsa.ProtoActor.Common;

public class ProtoActorSystem
{
public IClusterProvider ClusterProvider { get; set; }
public GrpcNetRemoteConfig RemoteConfig { get; set; }
public ActorSystemConfig ActorSystemConfig { get; set; } = ActorSystemConfig.Setup();
public IIdentityLookup IdentityLookup { get; set; }

public ClusterConfigurationSettings ClusterConfigurationSettings { get; set; } = new();
public ProtoActorSystem()
{
}

public string Name { get; set; }

public ProtoActorSystem(IClusterProvider clusterProvider, GrpcNetRemoteConfig remoteConfig, ActorSystemConfig actorSystemConfig, IIdentityLookup identityLookup, string name, ClusterConfigurationSettings clusterConfigurationSettings)
public ProtoActorSystem(
IClusterProvider clusterProvider,
GrpcNetRemoteConfig remoteConfig,
ActorSystemConfig actorSystemConfig,
IIdentityLookup identityLookup,
string name,
ClusterConfigurationSettings clusterConfigurationSettings)
{
ClusterProvider = clusterProvider;
RemoteConfig = remoteConfig;
Expand All @@ -27,7 +28,10 @@ public ProtoActorSystem(IClusterProvider clusterProvider, GrpcNetRemoteConfig re
ClusterConfigurationSettings = clusterConfigurationSettings;
}

public ProtoActorSystem()
{
}
public IClusterProvider ClusterProvider { get; set; }
public GrpcNetRemoteConfig RemoteConfig { get; set; }
public ActorSystemConfig ActorSystemConfig { get; set; } = ActorSystemConfig.Setup();
public IIdentityLookup IdentityLookup { get; set; }
public ClusterConfigurationSettings ClusterConfigurationSettings { get; set; } = new();
public string Name { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
using System;
using Elsa.ProtoActor.Common;
using Elsa.ProtoActor.Configuration;
using Elsa.Features.Extensions;
using Elsa.ProtoActor.Features;

namespace Elsa.ProtoActor.Kubernetes;

public static class DependencyInjectionExtensions
{
public static ProtoActorFeature WithKubernetesProvider(this ProtoActorFeature protoActorFeature, Action<KubernetesProviderOptions> providerOptions)
public static ProtoActorFeature UseKubernetes(this ProtoActorFeature protoActorFeature, Action<KubernetesProtoActorFeature>? kubernetesFeature = default)
{
var options = new KubernetesProviderOptions();
providerOptions?.Invoke(options);

protoActorFeature.ConfigureProtoActorBuilder(sp =>
new ProtoActorBuilder().UseKubernetesProvider(options).Build());
protoActorFeature.Module.Use<KubernetesProtoActorFeature>(feature => kubernetesFeature?.Invoke(feature));
return protoActorFeature;
}
}
Loading

0 comments on commit e2ffafc

Please sign in to comment.