Skip to content

Commit

Permalink
.Net Processes - Support Complex Type Serialization for Dapr Events a…
Browse files Browse the repository at this point in the history
…nd Messages (#9525)

### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

DAPR events and messages result in runtime failure when contain anything
other than the supported primitive types:


https://docs.dapr.io/developing-applications/sdks/dotnet/dotnet-actors/dotnet-actors-serialization/#supported-primitive-types

This means arrays, lists, and objects are not currently able to be
emitted by steps.

### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

Translating `ProcessMessage`, `ProcessEvent`, and `KernelProcessMessage`
into JSON strings prior to data-contract serialization. As part of this,
any relevant type information is included in the json payload.

On deserialization, the type information is utilized to convert any
`JsonElement` data values to the expected type.

**Notes:**
- Added unit tests (DAPR specific project)
- Removed `DataContract` where not needed (and tests)
- Verified DAPR demo
- Attempted to minimize postfix null-supression operator as able.
- Replaced `var` with explicit types for my own clarity...and left them
for posterity.
- Eliminated redundant namespace scoping

### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman authored Nov 5, 2024
1 parent 9eae969 commit 6fee23c
Show file tree
Hide file tree
Showing 31 changed files with 809 additions and 297 deletions.
10 changes: 9 additions & 1 deletion dotnet/SK-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "FunctionCalling", "Function
ProjectSection(SolutionItems) = preProject
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallingUtilities.props = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallingUtilities.props
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessor.cs = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessor.cs
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessorLoggerExtensions.cs = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessorLoggerExtensions.cs
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connectors.Weaviate.UnitTests", "src\Connectors\Connectors.Weaviate.UnitTests\Connectors.Weaviate.UnitTests.csproj", "{E8FC97B0-B417-4A90-993C-B8AA9223B058}"
Expand Down Expand Up @@ -411,6 +410,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Process.Utilities.UnitTests
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GettingStartedWithVectorStores", "samples\GettingStartedWithVectorStores\GettingStartedWithVectorStores.csproj", "{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Process.Runtime.Dapr.UnitTests", "src\Experimental\Process.Runtime.Dapr.UnitTests\Process.Runtime.Dapr.UnitTests.csproj", "{DB58FDD0-308E-472F-BFF5-508BC64C727E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1078,6 +1079,12 @@ Global
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Publish|Any CPU.Build.0 = Debug|Any CPU
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Publish|Any CPU.ActiveCfg = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Publish|Any CPU.Build.0 = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1226,6 +1233,7 @@ Global
{39EAB599-742F-417D-AF80-95F90376BB18} = {831DDCA2-7D2C-4C31-80DB-6BDB3E1F7AE0}
{DAC54048-A39A-4739-8307-EA5A291F2EA0} = {0D8C6358-5DAA-4EA6-A924-C268A9A21BC9}
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E} = {FA3720F1-C99A-49B2-9577-A940257098BF}
{DB58FDD0-308E-472F-BFF5-508BC64C727E} = {0D8C6358-5DAA-4EA6-A924-C268A9A21BC9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FBDC56A3-86AD-4323-AA0F-201E59123B83}
Expand Down
34 changes: 20 additions & 14 deletions dotnet/src/Experimental/Process.Abstractions/KernelProcessError.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Runtime.Serialization;

namespace Microsoft.SemanticKernel;

Expand All @@ -10,30 +9,37 @@ namespace Microsoft.SemanticKernel;
/// <remarks>
/// Initializes a new instance of the <see cref="KernelProcessError"/> class.
/// </remarks>
/// <param name="Type">The exception type name</param>
/// <param name="Message">The exception message (<see cref="Exception.Message"/></param>
/// <param name="StackTrace">The exception stack-trace (<see cref="Exception.StackTrace"/></param>
[DataContract]
public sealed record KernelProcessError(
[property:DataMember]
string Type,
[property:DataMember]
string Message,
[property:DataMember]
string? StackTrace)
public sealed record KernelProcessError
{
/// <summary>
///The exception type name.
/// </summary>
public string Type { get; init; } = string.Empty;

/// <summary>
/// The exception message (<see cref="Exception.Message"/>.
/// </summary>
public string Message { get; init; } = string.Empty;

/// <summary>
/// The exception stack-trace (<see cref="Exception.StackTrace"/>.
/// </summary>
public string? StackTrace { get; init; }

/// <summary>
/// The inner failure, when exists, as <see cref="KernelProcessError"/>.
/// </summary>
[DataMember]
public KernelProcessError? InnerError { get; init; }

/// <summary>
/// Factory method to create a <see cref="KernelProcessError"/> from a source <see cref="Exception"/> object.
/// </summary>
public static KernelProcessError FromException(Exception ex) =>
new(ex.GetType().Name, ex.Message, ex.StackTrace)
new()
{
Type = ex.GetType().Name,
Message = ex.Message,
StackTrace = ex.StackTrace,
InnerError = ex.InnerException is not null ? FromException(ex.InnerException) : null
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
namespace Microsoft.SemanticKernel;

/// <summary>
/// An class representing an event that can be emitted from a <see cref="KernelProcessStep"/>. This type is convertible to and from CloudEvents.
/// A class representing an event that can be emitted from a <see cref="KernelProcessStep"/>. This type is convertible to and from CloudEvents.
/// </summary>
public sealed record KernelProcessEvent
{
/// <summary>
/// The unique identifier for the event.
/// </summary>
public string? Id { get; set; }
public string Id { get; init; } = string.Empty;

/// <summary>
/// An optional data payload associated with the event.
/// </summary>
public object? Data { get; set; }
public object? Data { get; init; }

/// <summary>
/// The visibility of the event. Defaults to <see cref="KernelProcessEventVisibility.Internal"/>.
Expand Down
6 changes: 3 additions & 3 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message)
{
// Create the external event that will be used to start the nested process. Since this event came
// from outside this processes, we set the visibility to internal so that it's not emitted back out again.
var nestedEvent = new KernelProcessEvent() { Id = eventId, Data = message.TargetEventData, Visibility = KernelProcessEventVisibility.Internal };
KernelProcessEvent nestedEvent = new() { Id = eventId, Data = message.TargetEventData, Visibility = KernelProcessEventVisibility.Internal };

// Run the nested process completely within a single superstep.
await this.RunOnceAsync(nestedEvent, this._kernel).ConfigureAwait(false);
Expand Down Expand Up @@ -300,7 +300,7 @@ private void EnqueueExternalMessages(Queue<ProcessMessage> messageChannel)
{
while (this._externalEventChannel.Reader.TryRead(out var externalEvent))
{
if (this._outputEdges!.TryGetValue(externalEvent.Id!, out List<KernelProcessEdge>? edges) && edges is not null)
if (this._outputEdges.TryGetValue(externalEvent.Id, out List<KernelProcessEdge>? edges) && edges is not null)
{
foreach (var edge in edges)
{
Expand Down Expand Up @@ -330,7 +330,7 @@ private void EnqueueStepMessages(LocalStep step, Queue<ProcessMessage> messageCh

// Get the edges for the event and queue up the messages to be sent to the next steps.
bool foundEdge = false;
foreach (KernelProcessEdge edge in step.GetEdgeForEvent(stepEvent.Id))
foreach (KernelProcessEdge edge in step.GetEdgeForEvent(stepEvent.QualifiedId))
{
ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, stepEvent.Data);
messageChannel.Enqueue(message);
Expand Down
62 changes: 20 additions & 42 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,9 @@ internal IEnumerable<KernelProcessEdge> GetEdgeForEvent(string eventId)
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <returns>A <see cref="ValueTask"/></returns>
public ValueTask EmitEventAsync(KernelProcessEvent processEvent) => this.EmitEventAsync(processEvent, isError: false);

/// <summary>
/// Emits an event from the step.
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <param name="isError">Flag indicating if the event being emitted is in response to a step failure</param>
/// <returns>A <see cref="ValueTask"/></returns>
internal ValueTask EmitEventAsync(KernelProcessEvent processEvent, bool isError)
public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
{
this.EmitEvent(new ProcessEvent(this._eventNamespace, processEvent, isError));
this.EmitEvent(ProcessEvent.Create(processEvent, this._eventNamespace));
return default;
}

Expand Down Expand Up @@ -193,23 +185,25 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
try
{
FunctionResult invokeResult = await this.InvokeFunction(function, this._kernel, arguments).ConfigureAwait(false);
await this.EmitEventAsync(
new KernelProcessEvent
this.EmitEvent(
new ProcessEvent
{
Id = $"{targetFunction}.OnResult",
Data = invokeResult.GetValue<object>(),
}).ConfigureAwait(false);
Namespace = this._eventNamespace,
SourceId = $"{targetFunction}.OnResult",
Data = invokeResult.GetValue<object>()
});
}
catch (Exception ex)
{
this._logger.LogError("Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
await this.EmitEventAsync(
new KernelProcessEvent
this._logger.LogError(ex, "Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
this.EmitEvent(
new ProcessEvent
{
Id = $"{targetFunction}.OnError",
Namespace = this._eventNamespace,
SourceId = $"{targetFunction}.OnError",
Data = KernelProcessError.FromException(ex),
},
isError: true).ConfigureAwait(false);
IsError = true
});
}
finally
{
Expand Down Expand Up @@ -250,23 +244,18 @@ protected virtual async ValueTask InitializeStepAsync()
throw new KernelException("The state object for the KernelProcessStep could not be created.").Log(this._logger);
}

MethodInfo? methodInfo = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.ActivateAsync), [stateType]);

if (methodInfo is null)
{
MethodInfo methodInfo =
this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.ActivateAsync), [stateType]) ??
throw new KernelException("The ActivateAsync method for the KernelProcessStep could not be found.").Log(this._logger);
}

this._stepState = stateObject;

ValueTask? activateTask = (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]);
if (activateTask == null)
{
ValueTask activateTask =
(ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]) ??
throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger);
}

await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false);
await activateTask.Value.ConfigureAwait(false);
await activateTask.ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -315,15 +304,4 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent)
Verify.NotNull(localEvent, nameof(localEvent));
return localEvent with { Namespace = $"{this.Name}_{this.Id}" };
}

/// <summary>
/// Generates a scoped event for the step.
/// </summary>
/// <param name="processEvent">The event.</param>
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
protected ProcessEvent ScopedEvent(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent, nameof(processEvent));
return new ProcessEvent($"{this.Name}_{this.Id}", processEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process.Runtime;
using Microsoft.SemanticKernel.Process.Serialization;
using Xunit;

namespace SemanticKernel.Process.Dapr.Runtime.UnitTests;

/// <summary>
/// Unit tests for the <see cref="ProcessEvent"/> class.
/// </summary>
public class KernelProcessEventSerializationTests
{
/// <summary>
/// Validates that a <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out an explicit type definition for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifySerializeEventSingleTest()
{
// Arrange, Act & Assert
VerifyContainerSerialization([new() { Id = "Test", Data = 3 }]);
VerifyContainerSerialization([new() { Id = "Test", Data = "test" }]);
VerifyContainerSerialization([new() { Id = "Test", Data = Guid.NewGuid() }]);
VerifyContainerSerialization([new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } }]);
VerifyContainerSerialization([new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } }]);
VerifyContainerSerialization([new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) }]);
}

/// <summary>
/// Validates that a list <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out varying types assigned to for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifySerializeEventMixedTest()
{
// Arrange, Act & Assert
VerifyContainerSerialization(
[
new() { Id = "Test", Data = 3 },
new() { Id = "Test", Data = "test" },
new() { Id = "Test", Data = Guid.NewGuid() },
new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } },
new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } },
new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) },
]);
}

/// <summary>
/// Validates that a list <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out varying types assigned to for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifyDataContractSerializationTest()
{
// Arrange
KernelProcessEvent[] processEvents =
[
new() { Id = "Test", Data = 3 },
new() { Id = "Test", Data = "test" },
new() { Id = "Test", Data = Guid.NewGuid() },
new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } },
new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } },
new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) },
];
List<string> jsonEvents = [];
foreach (KernelProcessEvent processEvent in processEvents)
{
jsonEvents.Add(KernelProcessEventSerializer.ToJson(processEvent));
}

// Act
using MemoryStream stream = new();
jsonEvents.Serialize(stream);
stream.Position = 0;

List<string>? copy = stream.Deserialize<List<string>>();

// Assert
Assert.NotNull(copy);

// Act
IList<KernelProcessEvent> copiedEvents = KernelProcessEventSerializer.ToKernelProcessEvents(jsonEvents);

// Assert
Assert.Equivalent(processEvents, copiedEvents);
}

private static void VerifyContainerSerialization(KernelProcessEvent[] processEvents)
{
// Arrange
List<string> jsonEvents = [];
foreach (KernelProcessEvent processEvent in processEvents)
{
jsonEvents.Add(KernelProcessEventSerializer.ToJson(processEvent));
}

// Act
IList<KernelProcessEvent> copiedEvents = KernelProcessEventSerializer.ToKernelProcessEvents(jsonEvents);

// Assert
Assert.Equivalent(processEvents, copiedEvents);
}

internal sealed class ComplexData
{
public string Id { get; init; } = string.Empty;

public int Value { get; init; }
}
}
Loading

0 comments on commit 6fee23c

Please sign in to comment.