Skip to content

Commit

Permalink
.Net Processes - Fix Serialization of Input Data (#9614)
Browse files Browse the repository at this point in the history
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

Updated ability to serialize complex types here:
#9525

...but missed the remoting boundary for invoke the process. This change
addresses this final gap without altering external patterns.

### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

Convert event to JSON when invoking `ProcessActor` from
`DaprKernelProcessContext`

### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman authored Nov 7, 2024
1 parent 7404931 commit 1cb0922
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ public Task StartAsync(bool keepAlive)
/// </summary>
/// <param name="processEvent">Required. The <see cref="KernelProcessEvent"/> to start the process with.</param>
/// <returns>A <see cref="Task"/></returns>
public async Task RunOnceAsync(KernelProcessEvent processEvent)
public async Task RunOnceAsync(string processEvent)
{
Verify.NotNull(processEvent, nameof(processEvent));
IExternalEventBuffer externalEventQueue = this.ProxyFactory.CreateActorProxy<IExternalEventBuffer>(new ActorId(this.Id.GetId()), nameof(ExternalEventBufferActor));
await externalEventQueue.EnqueueAsync(processEvent.ToJson()).ConfigureAwait(false);
await externalEventQueue.EnqueueAsync(processEvent).ConfigureAwait(false);
await this.StartAsync(keepAlive: false).ConfigureAwait(false);
await this._processTask!.JoinAsync().ConfigureAwait(false);
}
Expand Down Expand Up @@ -137,10 +137,10 @@ public async Task StopAsync()
/// </summary>
/// <param name="processEvent">Required. The <see cref="KernelProcessEvent"/> to start the process with.</param>
/// <returns>A <see cref="Task"/></returns>
public async Task SendMessageAsync(KernelProcessEvent processEvent)
public async Task SendMessageAsync(string processEvent)
{
Verify.NotNull(processEvent, nameof(processEvent));
await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false);
await this._externalEventChannel.Writer.WriteAsync(processEvent.ToKernelProcessEvent()).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -202,7 +202,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message)
KernelProcessEvent nestedEvent = new() { Id = eventId, Data = message.TargetEventData };

// Run the nested process completely within a single superstep.
await this.RunOnceAsync(nestedEvent).ConfigureAwait(false);
await this.RunOnceAsync(nestedEvent.ToJson()).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Dapr.Actors;
using Dapr.Actors.Client;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Serialization;

namespace Microsoft.SemanticKernel;

Expand Down Expand Up @@ -40,7 +41,7 @@ internal async Task StartWithEventAsync(KernelProcessEvent initialEvent)
{
var daprProcess = DaprProcessInfo.FromKernelProcess(this._process);
await this._daprProcess.InitializeProcessAsync(daprProcess, null).ConfigureAwait(false);
await this._daprProcess.RunOnceAsync(initialEvent).ConfigureAwait(false);
await this._daprProcess.RunOnceAsync(initialEvent.ToJson()).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -49,7 +50,7 @@ internal async Task StartWithEventAsync(KernelProcessEvent initialEvent)
/// <param name="processEvent">The event to sent to the process.</param>
/// <returns>A <see cref="Task"/></returns>
public override async Task SendEventAsync(KernelProcessEvent processEvent) =>
await this._daprProcess.SendMessageAsync(processEvent).ConfigureAwait(false);
await this._daprProcess.SendMessageAsync(processEvent.ToJson()).ConfigureAwait(false);

/// <summary>
/// Stops the process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IProcess : IActor, IStep
/// </summary>
/// <param name="processEvent">Required. The <see cref="KernelProcessEvent"/> to start the process with.</param>
/// <returns>A <see cref="Task"/></returns>
Task RunOnceAsync(KernelProcessEvent processEvent);
Task RunOnceAsync(string processEvent);

/// <summary>
/// Stops a running process. This will cancel the process and wait for it to complete before returning.
Expand All @@ -45,7 +45,7 @@ public interface IProcess : IActor, IStep
/// </summary>
/// <param name="processEvent">Required. The <see cref="KernelProcessEvent"/> to start the process with.</param>
/// <returns>A <see cref="Task"/></returns>
Task SendMessageAsync(KernelProcessEvent processEvent);
Task SendMessageAsync(string processEvent);

/// <summary>
/// Gets the process information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ IEnumerable<KernelProcessEvent> Deserialize()
{
foreach (string json in jsonEvents)
{
EventContainer<KernelProcessEvent> eventContainer =
JsonSerializer.Deserialize<EventContainer<KernelProcessEvent>>(json) ??
throw new KernelException($"Unable to deserialize {nameof(KernelProcessEvent)} queue.");
yield return eventContainer.Payload with { Data = TypeInfo.ConvertValue(eventContainer.DataTypeName, eventContainer.Payload.Data) };
yield return json.ToKernelProcessEvent();
}
}
}

/// <summary>
/// Deserialize a list of JSON events into a list of <see cref="KernelProcessEvent"/> objects.
/// </summary>
/// <exception cref="KernelException">If any event fails deserialization</exception>
public static KernelProcessEvent ToKernelProcessEvent(this string jsonEvent)
{
EventContainer<KernelProcessEvent> eventContainer =
JsonSerializer.Deserialize<EventContainer<KernelProcessEvent>>(jsonEvent) ??
throw new KernelException($"Unable to deserialize {nameof(KernelProcessEvent)} queue.");
return eventContainer.Payload with { Data = TypeInfo.ConvertValue(eventContainer.DataTypeName, eventContainer.Payload.Data) };
}
}

0 comments on commit 1cb0922

Please sign in to comment.