Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net Processes - Support Complex Type Serialization for Dapr Events and Messages #9525

Merged
merged 36 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a3091f6
Checkpoint
crickman Nov 1, 2024
850d1f5
Checkpoint - KernelProcessEvent
crickman Nov 4, 2024
dfb1a73
Unit-Tests
crickman Nov 4, 2024
d49ea08
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 4, 2024
aafebb3
Remove cruft
crickman Nov 4, 2024
62f90dc
Record clean-up
crickman Nov 4, 2024
fa53b2c
Clean-up
crickman Nov 4, 2024
8de228c
Namespace
crickman Nov 4, 2024
428e52c
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 4, 2024
bb487ae
Checkpoint - ProcessEvent
crickman Nov 4, 2024
d64cd07
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 4, 2024
f613871
Unit-test checkpoint
crickman Nov 4, 2024
227ac1a
Checkpoint - UnitTests
crickman Nov 4, 2024
e5e9083
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 4, 2024
e2b65dd
Reset IT
crickman Nov 4, 2024
123ce0c
More
crickman Nov 4, 2024
07ba674
Cleanup
crickman Nov 4, 2024
32c6e77
Remove extra
crickman Nov 4, 2024
4ce30f1
Clean-up / more tests
crickman Nov 4, 2024
9c10f67
Formatting
crickman Nov 4, 2024
5652fd2
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 4, 2024
a0cf84e
Checkpoint
crickman Nov 5, 2024
e3703f5
Merge branch 'process-knowntype-upgrade' of https://github.com/micros…
crickman Nov 5, 2024
93ae016
Phew
crickman Nov 5, 2024
d00d09a
Namespace
crickman Nov 5, 2024
2356772
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
e125891
Comments
crickman Nov 5, 2024
ead3c67
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
3ebd474
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
5a30dba
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
30dce52
Ensure deserialization error doesn't result in indeterminate state
crickman Nov 5, 2024
b834d4f
Fix project
crickman Nov 5, 2024
ddf2dd5
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
317db32
Merge branch 'main' into process-knowntype-upgrade
crickman Nov 5, 2024
39b66d8
Fix integration failure
crickman Nov 5, 2024
86beeb5
Merge branch 'process-knowntype-upgrade' of https://github.com/micros…
crickman Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dotnet/SK-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "FunctionCalling", "Function
ProjectSection(SolutionItems) = preProject
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallingUtilities.props = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallingUtilities.props
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessor.cs = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessor.cs
src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessorLoggerExtensions.cs = src\InternalUtilities\connectors\AI\FunctionCalling\FunctionCallsProcessorLoggerExtensions.cs
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connectors.Weaviate.UnitTests", "src\Connectors\Connectors.Weaviate.UnitTests\Connectors.Weaviate.UnitTests.csproj", "{E8FC97B0-B417-4A90-993C-B8AA9223B058}"
Expand Down Expand Up @@ -411,6 +410,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Process.Utilities.UnitTests
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GettingStartedWithVectorStores", "samples\GettingStartedWithVectorStores\GettingStartedWithVectorStores.csproj", "{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Process.Runtime.Dapr.UnitTests", "src\Experimental\Process.Runtime.Dapr.UnitTests\Process.Runtime.Dapr.UnitTests.csproj", "{DB58FDD0-308E-472F-BFF5-508BC64C727E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1078,6 +1079,12 @@ Global
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Publish|Any CPU.Build.0 = Debug|Any CPU
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Publish|Any CPU.ActiveCfg = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Publish|Any CPU.Build.0 = Debug|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58FDD0-308E-472F-BFF5-508BC64C727E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1226,6 +1233,7 @@ Global
{39EAB599-742F-417D-AF80-95F90376BB18} = {831DDCA2-7D2C-4C31-80DB-6BDB3E1F7AE0}
{DAC54048-A39A-4739-8307-EA5A291F2EA0} = {0D8C6358-5DAA-4EA6-A924-C268A9A21BC9}
{8C3DE41C-E2C8-42B9-8638-574F8946EB0E} = {FA3720F1-C99A-49B2-9577-A940257098BF}
{DB58FDD0-308E-472F-BFF5-508BC64C727E} = {0D8C6358-5DAA-4EA6-A924-C268A9A21BC9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FBDC56A3-86AD-4323-AA0F-201E59123B83}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Runtime.Serialization;

namespace Microsoft.SemanticKernel;

Expand All @@ -10,30 +9,37 @@ namespace Microsoft.SemanticKernel;
/// <remarks>
/// Initializes a new instance of the <see cref="KernelProcessError"/> class.
/// </remarks>
/// <param name="Type">The exception type name</param>
/// <param name="Message">The exception message (<see cref="Exception.Message"/></param>
/// <param name="StackTrace">The exception stack-trace (<see cref="Exception.StackTrace"/></param>
[DataContract]
public sealed record KernelProcessError(
[property:DataMember]
string Type,
[property:DataMember]
string Message,
[property:DataMember]
string? StackTrace)
public sealed record KernelProcessError
{
/// <summary>
///The exception type name.
/// </summary>
public string Type { get; init; } = string.Empty;

/// <summary>
/// The exception message (<see cref="Exception.Message"/>.
/// </summary>
public string Message { get; init; } = string.Empty;

/// <summary>
/// The exception stack-trace (<see cref="Exception.StackTrace"/>.
/// </summary>
public string? StackTrace { get; init; }

/// <summary>
/// The inner failure, when exists, as <see cref="KernelProcessError"/>.
/// </summary>
[DataMember]
public KernelProcessError? InnerError { get; init; }

/// <summary>
/// Factory method to create a <see cref="KernelProcessError"/> from a source <see cref="Exception"/> object.
/// </summary>
public static KernelProcessError FromException(Exception ex) =>
new(ex.GetType().Name, ex.Message, ex.StackTrace)
new()
{
Type = ex.GetType().Name,
Message = ex.Message,
StackTrace = ex.StackTrace,
InnerError = ex.InnerException is not null ? FromException(ex.InnerException) : null
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
namespace Microsoft.SemanticKernel;

/// <summary>
/// An class representing an event that can be emitted from a <see cref="KernelProcessStep"/>. This type is convertible to and from CloudEvents.
/// A class representing an event that can be emitted from a <see cref="KernelProcessStep"/>. This type is convertible to and from CloudEvents.
/// </summary>
public sealed record KernelProcessEvent
{
/// <summary>
/// The unique identifier for the event.
/// </summary>
public string? Id { get; set; }
public string Id { get; init; } = string.Empty;

/// <summary>
/// An optional data payload associated with the event.
/// </summary>
public object? Data { get; set; }
public object? Data { get; init; }

/// <summary>
/// The visibility of the event. Defaults to <see cref="KernelProcessEventVisibility.Internal"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message)
{
// Create the external event that will be used to start the nested process. Since this event came
// from outside this processes, we set the visibility to internal so that it's not emitted back out again.
var nestedEvent = new KernelProcessEvent() { Id = eventId, Data = message.TargetEventData, Visibility = KernelProcessEventVisibility.Internal };
KernelProcessEvent nestedEvent = new() { Id = eventId, Data = message.TargetEventData, Visibility = KernelProcessEventVisibility.Internal };

// Run the nested process completely within a single superstep.
await this.RunOnceAsync(nestedEvent, this._kernel).ConfigureAwait(false);
Expand Down Expand Up @@ -300,7 +300,7 @@ private void EnqueueExternalMessages(Queue<ProcessMessage> messageChannel)
{
while (this._externalEventChannel.Reader.TryRead(out var externalEvent))
{
if (this._outputEdges!.TryGetValue(externalEvent.Id!, out List<KernelProcessEdge>? edges) && edges is not null)
if (this._outputEdges.TryGetValue(externalEvent.Id, out List<KernelProcessEdge>? edges) && edges is not null)
{
foreach (var edge in edges)
{
Expand Down Expand Up @@ -330,7 +330,7 @@ private void EnqueueStepMessages(LocalStep step, Queue<ProcessMessage> messageCh

// Get the edges for the event and queue up the messages to be sent to the next steps.
bool foundEdge = false;
foreach (KernelProcessEdge edge in step.GetEdgeForEvent(stepEvent.Id))
foreach (KernelProcessEdge edge in step.GetEdgeForEvent(stepEvent.QualifiedId))
{
ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, stepEvent.Data);
messageChannel.Enqueue(message);
Expand Down
76 changes: 21 additions & 55 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,9 @@ internal IEnumerable<KernelProcessEdge> GetEdgeForEvent(string eventId)
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <returns>A <see cref="ValueTask"/></returns>
public ValueTask EmitEventAsync(KernelProcessEvent processEvent) => this.EmitEventAsync(processEvent, isError: false);

/// <summary>
/// Emits an event from the step.
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <param name="isError">Flag indicating if the event being emitted is in response to a step failure</param>
/// <returns>A <see cref="ValueTask"/></returns>
internal ValueTask EmitEventAsync(KernelProcessEvent processEvent, bool isError)
public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
{
this.EmitEvent(new ProcessEvent(this._eventNamespace, processEvent, isError));
this.EmitEvent(ProcessEvent.Create(processEvent, this._eventNamespace));
return default;
}

Expand Down Expand Up @@ -193,23 +185,25 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
try
{
FunctionResult invokeResult = await this.InvokeFunction(function, this._kernel, arguments).ConfigureAwait(false);
await this.EmitEventAsync(
new KernelProcessEvent
this.EmitEvent(
new ProcessEvent
{
Id = $"{targetFunction}.OnResult",
Data = invokeResult.GetValue<object>(),
}).ConfigureAwait(false);
Namespace = this._eventNamespace,
SourceId = $"{targetFunction}.OnResult",
Data = invokeResult.GetValue<object>()
});
}
catch (Exception ex)
{
this._logger.LogError("Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
await this.EmitEventAsync(
new KernelProcessEvent
this._logger.LogError(ex, "Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
this.EmitEvent(
new ProcessEvent
{
Id = $"{targetFunction}.OnError",
Namespace = this._eventNamespace,
SourceId = $"{targetFunction}.OnError",
Data = KernelProcessError.FromException(ex),
},
isError: true).ConfigureAwait(false);
IsError = true
});
}
finally
{
Expand Down Expand Up @@ -250,23 +244,18 @@ protected virtual async ValueTask InitializeStepAsync()
throw new KernelException("The state object for the KernelProcessStep could not be created.").Log(this._logger);
}

MethodInfo? methodInfo = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.ActivateAsync), [stateType]);

if (methodInfo is null)
{
MethodInfo methodInfo =
this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.ActivateAsync), [stateType]) ??
throw new KernelException("The ActivateAsync method for the KernelProcessStep could not be found.").Log(this._logger);
}

this._stepState = stateObject;

ValueTask? activateTask = (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]);
if (activateTask == null)
{
ValueTask activateTask =
(ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]) ??
throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger);
}

await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false);
await activateTask.Value.ConfigureAwait(false);
await activateTask.ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -301,29 +290,6 @@ internal virtual async Task<KernelProcessStepInfo> ToKernelProcessStepInfoAsync(
/// <param name="localEvent">The event to emit.</param>
protected void EmitEvent(ProcessEvent localEvent)
{
var scopedEvent = this.ScopedEvent(localEvent);
this._outgoingEventQueue.Enqueue(scopedEvent);
}

/// <summary>
/// Generates a scoped event for the step.
/// </summary>
/// <param name="localEvent">The event.</param>
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
protected ProcessEvent ScopedEvent(ProcessEvent localEvent)
{
Verify.NotNull(localEvent, nameof(localEvent));
return localEvent with { Namespace = $"{this.Name}_{this.Id}" };
}

/// <summary>
/// Generates a scoped event for the step.
/// </summary>
/// <param name="processEvent">The event.</param>
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
protected ProcessEvent ScopedEvent(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent, nameof(processEvent));
return new ProcessEvent($"{this.Name}_{this.Id}", processEvent);
this._outgoingEventQueue.Enqueue(localEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process.Runtime;
using Microsoft.SemanticKernel.Process.Serialization;
using Xunit;

namespace SemanticKernel.Process.Dapr.Runtime.UnitTests;

/// <summary>
/// Unit tests for the <see cref="ProcessEvent"/> class.
/// </summary>
public class KernelProcessEventSerializationTests
{
/// <summary>
/// Validates that a <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out an explicit type definition for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifySerializeEventSingleTest()
{
// Arrange, Act & Assert
VerifyContainerSerialization([new() { Id = "Test", Data = 3 }]);
VerifyContainerSerialization([new() { Id = "Test", Data = "test" }]);
VerifyContainerSerialization([new() { Id = "Test", Data = Guid.NewGuid() }]);
VerifyContainerSerialization([new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } }]);
VerifyContainerSerialization([new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } }]);
VerifyContainerSerialization([new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) }]);
}

/// <summary>
/// Validates that a list <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out varying types assigned to for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifySerializeEventMixedTest()
{
// Arrange, Act & Assert
VerifyContainerSerialization(
[
new() { Id = "Test", Data = 3 },
new() { Id = "Test", Data = "test" },
new() { Id = "Test", Data = Guid.NewGuid() },
new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } },
new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } },
new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) },
]);
}

/// <summary>
/// Validates that a list <see cref="KernelProcessEvent"/> can be serialized and deserialized correctly
/// with out varying types assigned to for <see cref="KernelProcessEvent.Data"/>
/// </summary>
[Fact]
public void VerifyDataContractSerializationTest()
{
// Arrange
KernelProcessEvent[] processEvents =
[
new() { Id = "Test", Data = 3 },
new() { Id = "Test", Data = "test" },
new() { Id = "Test", Data = Guid.NewGuid() },
new() { Id = "Test", Data = new int[] { 1, 2, 3, 4 } },
new() { Id = "Test", Data = new ComplexData { Id = "test", Value = 3 } },
new() { Id = "testid", Data = KernelProcessError.FromException(new InvalidOperationException()) },
];
List<string> jsonEvents = [];
foreach (KernelProcessEvent processEvent in processEvents)
{
jsonEvents.Add(KernelProcessEventSerializer.ToJson(processEvent));
}

// Act
using MemoryStream stream = new();
jsonEvents.Serialize(stream);
stream.Position = 0;

List<string>? copy = stream.Deserialize<List<string>>();

// Assert
Assert.NotNull(copy);

// Act
IList<KernelProcessEvent> copiedEvents = KernelProcessEventSerializer.ToKernelProcessEvents(jsonEvents);

// Assert
Assert.Equivalent(processEvents, copiedEvents);
}

private static void VerifyContainerSerialization(KernelProcessEvent[] processEvents)
{
// Arrange
List<string> jsonEvents = [];
foreach (KernelProcessEvent processEvent in processEvents)
{
jsonEvents.Add(KernelProcessEventSerializer.ToJson(processEvent));
}

// Act
IList<KernelProcessEvent> copiedEvents = KernelProcessEventSerializer.ToKernelProcessEvents(jsonEvents);

// Assert
Assert.Equivalent(processEvents, copiedEvents);
}

internal sealed class ComplexData
{
public string Id { get; init; } = string.Empty;

public int Value { get; init; }
}
}
Loading
Loading