From 1cb0922495551d637e2cad78202613228e0df271 Mon Sep 17 00:00:00 2001 From: Chris <66376200+crickman@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:38:50 -0800 Subject: [PATCH] .Net Processes - Fix Serialization of Input Data (#9614) ### Motivation and Context Updated ability to serialize complex types here: https://github.com/microsoft/semantic-kernel/pull/9525 ...but missed the remoting boundary for invoke the process. This change addresses this final gap without altering external patterns. ### Description Convert event to JSON when invoking `ProcessActor` from `DaprKernelProcessContext` ### Contribution Checklist - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone :smile: --- .../Process.Runtime.Dapr/Actors/ProcessActor.cs | 10 +++++----- .../DaprKernelProcessContext.cs | 5 +++-- .../Process.Runtime.Dapr/Interfaces/IProcess.cs | 4 ++-- .../KernelProcessEventSerializer.cs | 17 +++++++++++++---- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs index 0d033659399d..4416e1488fdb 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs @@ -94,11 +94,11 @@ public Task StartAsync(bool keepAlive) /// /// Required. The to start the process with. /// A - public async Task RunOnceAsync(KernelProcessEvent processEvent) + public async Task RunOnceAsync(string processEvent) { Verify.NotNull(processEvent, nameof(processEvent)); IExternalEventBuffer externalEventQueue = this.ProxyFactory.CreateActorProxy(new ActorId(this.Id.GetId()), nameof(ExternalEventBufferActor)); - await externalEventQueue.EnqueueAsync(processEvent.ToJson()).ConfigureAwait(false); + await externalEventQueue.EnqueueAsync(processEvent).ConfigureAwait(false); await this.StartAsync(keepAlive: false).ConfigureAwait(false); await this._processTask!.JoinAsync().ConfigureAwait(false); } @@ -137,10 +137,10 @@ public async Task StopAsync() /// /// Required. The to start the process with. /// A - public async Task SendMessageAsync(KernelProcessEvent processEvent) + public async Task SendMessageAsync(string processEvent) { Verify.NotNull(processEvent, nameof(processEvent)); - await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false); + await this._externalEventChannel.Writer.WriteAsync(processEvent.ToKernelProcessEvent()).ConfigureAwait(false); } /// @@ -202,7 +202,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message) KernelProcessEvent nestedEvent = new() { Id = eventId, Data = message.TargetEventData }; // Run the nested process completely within a single superstep. - await this.RunOnceAsync(nestedEvent).ConfigureAwait(false); + await this.RunOnceAsync(nestedEvent.ToJson()).ConfigureAwait(false); } } } diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs index 380558f5a918..fd055003146b 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs @@ -5,6 +5,7 @@ using Dapr.Actors; using Dapr.Actors.Client; using Microsoft.SemanticKernel.Process; +using Microsoft.SemanticKernel.Process.Serialization; namespace Microsoft.SemanticKernel; @@ -40,7 +41,7 @@ internal async Task StartWithEventAsync(KernelProcessEvent initialEvent) { var daprProcess = DaprProcessInfo.FromKernelProcess(this._process); await this._daprProcess.InitializeProcessAsync(daprProcess, null).ConfigureAwait(false); - await this._daprProcess.RunOnceAsync(initialEvent).ConfigureAwait(false); + await this._daprProcess.RunOnceAsync(initialEvent.ToJson()).ConfigureAwait(false); } /// @@ -49,7 +50,7 @@ internal async Task StartWithEventAsync(KernelProcessEvent initialEvent) /// The event to sent to the process. /// A public override async Task SendEventAsync(KernelProcessEvent processEvent) => - await this._daprProcess.SendMessageAsync(processEvent).ConfigureAwait(false); + await this._daprProcess.SendMessageAsync(processEvent.ToJson()).ConfigureAwait(false); /// /// Stops the process. diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs index 12a99606192b..59855e5ad45a 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs @@ -31,7 +31,7 @@ public interface IProcess : IActor, IStep /// /// Required. The to start the process with. /// A - Task RunOnceAsync(KernelProcessEvent processEvent); + Task RunOnceAsync(string processEvent); /// /// Stops a running process. This will cancel the process and wait for it to complete before returning. @@ -45,7 +45,7 @@ public interface IProcess : IActor, IStep /// /// Required. The to start the process with. /// A - Task SendMessageAsync(KernelProcessEvent processEvent); + Task SendMessageAsync(string processEvent); /// /// Gets the process information. diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Serialization/KernelProcessEventSerializer.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Serialization/KernelProcessEventSerializer.cs index 01001f4d79a7..c77becb9f6f1 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Serialization/KernelProcessEventSerializer.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Serialization/KernelProcessEventSerializer.cs @@ -34,11 +34,20 @@ IEnumerable Deserialize() { foreach (string json in jsonEvents) { - EventContainer eventContainer = - JsonSerializer.Deserialize>(json) ?? - throw new KernelException($"Unable to deserialize {nameof(KernelProcessEvent)} queue."); - yield return eventContainer.Payload with { Data = TypeInfo.ConvertValue(eventContainer.DataTypeName, eventContainer.Payload.Data) }; + yield return json.ToKernelProcessEvent(); } } } + + /// + /// Deserialize a list of JSON events into a list of objects. + /// + /// If any event fails deserialization + public static KernelProcessEvent ToKernelProcessEvent(this string jsonEvent) + { + EventContainer eventContainer = + JsonSerializer.Deserialize>(jsonEvent) ?? + throw new KernelException($"Unable to deserialize {nameof(KernelProcessEvent)} queue."); + return eventContainer.Payload with { Data = TypeInfo.ConvertValue(eventContainer.DataTypeName, eventContainer.Payload.Data) }; + } }