From 83a59d4bf798d38507ee086d46f735072d22ce3c Mon Sep 17 00:00:00 2001 From: Chris <66376200+crickman@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:10:59 -0800 Subject: [PATCH] .Net Agents - Fix Function Call Handling for Streaming (#9652) ### Motivation and Context Fixes: https://github.com/microsoft/semantic-kernel/issues/9638 `System.ArgumentException: An item with the same key has already been added. ` - Duplicate key added when processing function result. ### Description The processing loop for assistant streaming is selecting completed steps which is resulting in over-processing that violates the state-tracking. This was due to leveraging the existing utiility method `GetRunStepsAsync`. I removed this method in favor of inline invocation since the processing for Streaming and Non-Streaming have distinct considerations. Also, as the SDK has evolved (paging removed), the utility method isn't adding much value. > Note: Was able to reproduce reported issue by setting `ParallelToolCallsEnabled = false` on `OpenAIAssistant_Streaming` demo and verify fix. Existing approach was able to handle parallel function calls and function calls on different steps adequetly. ### Contribution Checklist - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone :smile: --- .../OpenAI/Internal/AssistantThreadActions.cs | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/dotnet/src/Agents/OpenAI/Internal/AssistantThreadActions.cs b/dotnet/src/Agents/OpenAI/Internal/AssistantThreadActions.cs index bce9c3fe827a..a3af1cfb6626 100644 --- a/dotnet/src/Agents/OpenAI/Internal/AssistantThreadActions.cs +++ b/dotnet/src/Agents/OpenAI/Internal/AssistantThreadActions.cs @@ -194,7 +194,7 @@ public static async IAsyncEnumerable GetMessagesAsync(Assist throw new KernelException($"Agent Failure - Run terminated: {run.Status} [{run.Id}]: {run.LastError?.Message ?? "Unknown"}"); } - IReadOnlyList steps = await GetRunStepsAsync(client, run, cancellationToken).ConfigureAwait(false); + RunStep[] steps = await client.GetRunStepsAsync(run.ThreadId, run.Id, cancellationToken: cancellationToken).ToArrayAsync(cancellationToken).ConfigureAwait(false); // Is tool action required? if (run.Status == RunStatus.RequiresAction) @@ -475,11 +475,14 @@ public static async IAsyncEnumerable InvokeStreamin if (run.Status == RunStatus.RequiresAction) { - IReadOnlyList steps = await GetRunStepsAsync(client, run, cancellationToken).ConfigureAwait(false); + RunStep[] activeSteps = + await client.GetRunStepsAsync(run.ThreadId, run.Id, cancellationToken: cancellationToken) + .Where(step => step.Status == RunStepStatus.InProgress) + .ToArrayAsync(cancellationToken).ConfigureAwait(false); // Capture map between the tool call and its associated step Dictionary toolMap = []; - foreach (RunStep step in steps) + foreach (RunStep step in activeSteps) { foreach (RunStepToolCall stepDetails in step.Details.ToolCalls) { @@ -488,7 +491,7 @@ public static async IAsyncEnumerable InvokeStreamin } // Execute functions in parallel and post results at once. - FunctionCallContent[] functionCalls = steps.SelectMany(step => ParseFunctionStep(agent, step)).ToArray(); + FunctionCallContent[] functionCalls = activeSteps.SelectMany(step => ParseFunctionStep(agent, step)).ToArray(); if (functionCalls.Length > 0) { // Emit function-call content @@ -504,7 +507,7 @@ public static async IAsyncEnumerable InvokeStreamin ToolOutput[] toolOutputs = GenerateToolOutputs(functionResults); asyncUpdates = client.SubmitToolOutputsToRunStreamingAsync(run.ThreadId, run.Id, toolOutputs, cancellationToken); - foreach (RunStep step in steps) + foreach (RunStep step in activeSteps) { stepFunctionResults.Add(step.Id, functionResults.Where(result => step.Id == toolMap[result.CallId!]).ToArray()); } @@ -560,18 +563,6 @@ await RetrieveMessageAsync( logger.LogOpenAIAssistantCompletedRun(nameof(InvokeAsync), run?.Id ?? "Failed", threadId); } - private static async Task> GetRunStepsAsync(AssistantClient client, ThreadRun run, CancellationToken cancellationToken) - { - List steps = []; - - await foreach (RunStep step in client.GetRunStepsAsync(run.ThreadId, run.Id, cancellationToken: cancellationToken).ConfigureAwait(false)) - { - steps.Add(step); - } - - return steps; - } - private static ChatMessageContent GenerateMessageContent(string? assistantName, ThreadMessage message, RunStep? completedStep = null) { AuthorRole role = new(message.Role.ToString()); @@ -788,7 +779,7 @@ private static ChatMessageContent GenerateFunctionCallContent(string agentName, return functionCallContent; } - private static ChatMessageContent GenerateFunctionResultContent(string agentName, FunctionResultContent[] functionResults, RunStep completedStep) + private static ChatMessageContent GenerateFunctionResultContent(string agentName, IEnumerable functionResults, RunStep completedStep) { ChatMessageContent functionResultContent = new(AuthorRole.Tool, content: null) {