diff --git a/OpenAI.Playground/TestHelpers/AssistantHelpers/RunTestHelper.cs b/OpenAI.Playground/TestHelpers/AssistantHelpers/RunTestHelper.cs index 1e6ec2ed..d0f7b0fd 100644 --- a/OpenAI.Playground/TestHelpers/AssistantHelpers/RunTestHelper.cs +++ b/OpenAI.Playground/TestHelpers/AssistantHelpers/RunTestHelper.cs @@ -165,19 +165,58 @@ public static async Task CreateRunAsStreamTest(IOpenAIService openAI) var result = openAI.Beta.Runs.RunCreateAsStream(CreatedThreadId, new() { AssistantId = assistantResult.Id - }); + },justDataMode:false); await foreach (var run in result) { if (run.Successful) { - if (string.IsNullOrEmpty(run.Status)) + Console.WriteLine($"Event:{run.StreamEvent}"); + if (run is RunResponse runResponse) { - Console.Write("."); + if (string.IsNullOrEmpty(runResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Id: {runResponse.Id}, Status: {runResponse.Status}"); + } + } + + else if (run is RunStepResponse runStepResponse) + { + if (string.IsNullOrEmpty(runStepResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Step Id: {runStepResponse.Id}, Status: {runStepResponse.Status}"); + } + } + + else if (run is MessageResponse messageResponse) + { + if (string.IsNullOrEmpty(messageResponse.Id)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Message Id: {messageResponse.Id}, Message: {messageResponse.Content?.FirstOrDefault()?.Text?.Value}"); + } } else { - ConsoleExtensions.WriteLine($"Run Id: {run.Id}, Status: {run.Status}"); + if (run.StreamEvent!=null) + { + Console.WriteLine(run.StreamEvent); + } + else + { + Console.Write("."); + } } } else @@ -450,13 +489,52 @@ public static async Task SubmitToolOutputsAsStreamToRunTest(IOpenAIService openA { if (run.Successful) { - if (string.IsNullOrEmpty(run.Status)) + Console.WriteLine($"Event:{run.StreamEvent}"); + if (run is RunResponse runResponse) { - Console.Write("."); + if (string.IsNullOrEmpty(runResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Id: {runResponse.Id}, Status: {runResponse.Status}"); + } + } + + else if (run is RunStepResponse runStepResponse) + { + if (string.IsNullOrEmpty(runStepResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Step Id: {runStepResponse.Id}, Status: {runStepResponse.Status}"); + } + } + + else if (run is MessageResponse messageResponse) + { + if (string.IsNullOrEmpty(messageResponse.Id)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Message Id: {messageResponse.Id}, Message: {messageResponse.Content?.FirstOrDefault()?.Text?.Value}"); + } } else { - ConsoleExtensions.WriteLine($"Run Id: {run.Id}, Status: {run.Status}"); + if (run.StreamEvent != null) + { + Console.WriteLine(run.StreamEvent); + } + else + { + Console.Write("."); + } } } else @@ -642,13 +720,52 @@ public static async Task CreateThreadAndRunAsStream(IOpenAIService sdk) { if (run.Successful) { - if (string.IsNullOrEmpty(run.Status)) + Console.WriteLine($"Event:{run.StreamEvent}"); + if (run is RunResponse runResponse) { - Console.Write("."); + if (string.IsNullOrEmpty(runResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Id: {runResponse.Id}, Status: {runResponse.Status}"); + } + } + + else if (run is RunStepResponse runStepResponse) + { + if (string.IsNullOrEmpty(runStepResponse.Status)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Run Step Id: {runStepResponse.Id}, Status: {runStepResponse.Status}"); + } + } + + else if (run is MessageResponse messageResponse) + { + if (string.IsNullOrEmpty(messageResponse.Id)) + { + Console.Write("."); + } + else + { + ConsoleExtensions.WriteLine($"Message Id: {messageResponse.Id}, Message: {messageResponse.Content?.FirstOrDefault()?.Text?.Value}"); + } } else { - ConsoleExtensions.WriteLine($"Run Id: {run.Id}, Status: {run.Status}"); + if (run.StreamEvent != null) + { + Console.WriteLine(run.StreamEvent); + } + else + { + Console.Write("."); + } } } else diff --git a/OpenAI.SDK/Extensions/JsonToObjectRouterExtension.cs b/OpenAI.SDK/Extensions/JsonToObjectRouterExtension.cs new file mode 100644 index 00000000..fda64928 --- /dev/null +++ b/OpenAI.SDK/Extensions/JsonToObjectRouterExtension.cs @@ -0,0 +1,22 @@ +using System.Text.Json; +using OpenAI.ObjectModels.ResponseModels; +using OpenAI.ObjectModels.SharedModels; + +namespace OpenAI.Extensions; + +public static class JsonToObjectRouterExtension +{ + public static Type Route(string json) + { + var apiResponse = JsonSerializer.Deserialize(json); + + return apiResponse?.ObjectTypeName switch + { + "thread.run.step" => typeof(RunStepResponse), + "thread.run" => typeof(RunResponse), + "thread.message" => typeof(MessageResponse), + "thread.message.delta" => typeof(MessageResponse), + _ => typeof(BaseResponse) + }; + } +} \ No newline at end of file diff --git a/OpenAI.SDK/Extensions/StreamHandleExtension.cs b/OpenAI.SDK/Extensions/StreamHandleExtension.cs index 60c68a5c..3344bda4 100644 --- a/OpenAI.SDK/Extensions/StreamHandleExtension.cs +++ b/OpenAI.SDK/Extensions/StreamHandleExtension.cs @@ -1,4 +1,5 @@ -using System.Runtime.CompilerServices; +using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Text.Json; using OpenAI.ObjectModels; using OpenAI.ObjectModels.RequestModels; @@ -8,6 +9,10 @@ namespace OpenAI.Extensions; public static class StreamHandleExtension { + public static async IAsyncEnumerable AsStream(this HttpResponseMessage response, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var baseResponse in AsStream(response, justDataMode, cancellationToken)) yield return baseResponse; + } public static async IAsyncEnumerable AsStream(this HttpResponseMessage response, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) where TResponse : BaseResponse, new() { @@ -20,13 +25,15 @@ public static async IAsyncEnumerable AsStream(this HttpRes await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken); using var reader = new StreamReader(stream); - + string? tempStreamEvent = null; + bool isEventDelta; // Continuously read the stream until the end of it while (true) { cancellationToken.ThrowIfCancellationRequested(); var line = await reader.ReadLineAsync(); + // Console.WriteLine("---" + line); // Break the loop if we have reached the end of the stream if (line == null) { @@ -39,11 +46,28 @@ public static async IAsyncEnumerable AsStream(this HttpRes continue; } + if (line.StartsWith("event: ")) + { + line = line.RemoveIfStartWith("event: "); + tempStreamEvent = line; + isEventDelta = true; + } + else + { + isEventDelta = false; + } + if (justDataMode && !line.StartsWith("data: ")) { continue; } + if (!justDataMode && isEventDelta ) + { + yield return new(){ObjectTypeName = "base.stream.event",StreamEvent = tempStreamEvent}; + continue; + } + line = line.RemoveIfStartWith("data: "); // Exit the loop if the stream is done @@ -56,7 +80,14 @@ public static async IAsyncEnumerable AsStream(this HttpRes try { // When the response is good, each line is a serializable CompletionCreateRequest - block = JsonSerializer.Deserialize(line); + if (typeof(TResponse) == typeof(BaseResponse)) + { + block =JsonSerializer.Deserialize(line, JsonToObjectRouterExtension.Route(line), new JsonSerializerOptions()) as TResponse; + } + else + { + block = JsonSerializer.Deserialize(line); + } } catch (Exception) { @@ -78,6 +109,8 @@ public static async IAsyncEnumerable AsStream(this HttpRes { block.HttpStatusCode = httpStatusCode; block.HeaderValues = headerValues; + block.StreamEvent = tempStreamEvent; + tempStreamEvent = null; yield return block; } } diff --git a/OpenAI.SDK/Interfaces/IRunService.cs b/OpenAI.SDK/Interfaces/IRunService.cs index 85c5c5dd..dceec99e 100644 --- a/OpenAI.SDK/Interfaces/IRunService.cs +++ b/OpenAI.SDK/Interfaces/IRunService.cs @@ -1,5 +1,6 @@ using System.Runtime.CompilerServices; using OpenAI.ObjectModels.RequestModels; +using OpenAI.ObjectModels.ResponseModels; using OpenAI.ObjectModels.SharedModels; namespace OpenAI.Interfaces; @@ -24,8 +25,8 @@ public interface IRunService /// /// /// - /// - IAsyncEnumerable RunCreateAsStream(string threadId, RunCreateRequest request, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default); + /// also returns ,, + IAsyncEnumerable RunCreateAsStream(string threadId, RunCreateRequest request, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default); /// /// Retrieves a run. @@ -71,9 +72,10 @@ public interface IRunService /// /// /// + /// /// - /// - IAsyncEnumerable RunSubmitToolOutputsAsStream(string threadId, string runId, SubmitToolOutputsToRunRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default); + /// also returns ,, + IAsyncEnumerable RunSubmitToolOutputsAsStream(string threadId, string runId, SubmitToolOutputsToRunRequest request, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default); /// /// Modifies a run. @@ -93,7 +95,8 @@ public interface IRunService /// /// Create a thread and run it in one request as Stream. /// - IAsyncEnumerable CreateThreadAndRunAsStream(CreateThreadAndRunRequest createThreadAndRunRequest, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default); + /// also returns ,, + IAsyncEnumerable CreateThreadAndRunAsStream(CreateThreadAndRunRequest createThreadAndRunRequest, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default); /// /// Returns a list of runs belonging to a thread. diff --git a/OpenAI.SDK/Managers/OpenAIRunService.cs b/OpenAI.SDK/Managers/OpenAIRunService.cs index 5a7812cd..274b5525 100644 --- a/OpenAI.SDK/Managers/OpenAIRunService.cs +++ b/OpenAI.SDK/Managers/OpenAIRunService.cs @@ -2,6 +2,7 @@ using OpenAI.Extensions; using OpenAI.Interfaces; using OpenAI.ObjectModels.RequestModels; +using OpenAI.ObjectModels.ResponseModels; using OpenAI.ObjectModels.SharedModels; namespace OpenAI.Managers; @@ -36,7 +37,7 @@ public async Task RunCreate(string threadId, RunCreateRequest reque /// /// /// - public async IAsyncEnumerable RunCreateAsStream(string threadId, RunCreateRequest request, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable RunCreateAsStream(string threadId, RunCreateRequest request, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Mark the request as streaming request.Stream = true; @@ -52,7 +53,10 @@ public async IAsyncEnumerable RunCreateAsStream(string threadId, Ru yield break; } - await foreach (var baseResponse in response.AsStream(cancellationToken: cancellationToken)) yield return baseResponse; + await foreach (var baseResponse in response.AsStream(justDataMode: justDataMode,cancellationToken: cancellationToken)) + { + yield return baseResponse; + } } @@ -142,7 +146,7 @@ public async Task RunSubmitToolOutputs(string threadId, string runI return await _httpClient.PostAndReadAsAsync(_endpointProvider.RunSubmitToolOutputs(threadId, runId), request, cancellationToken); } - public async IAsyncEnumerable RunSubmitToolOutputsAsStream(string threadId, string runId, SubmitToolOutputsToRunRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public async IAsyncEnumerable RunSubmitToolOutputsAsStream(string threadId, string runId, SubmitToolOutputsToRunRequest request, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Mark the request as streaming request.Stream = true; @@ -152,11 +156,11 @@ public async IAsyncEnumerable RunSubmitToolOutputsAsStream(string t if (!response.IsSuccessStatusCode) { - yield return await response.HandleResponseContent(cancellationToken); + yield return await response.HandleResponseContent(cancellationToken); yield break; } - await foreach (var baseResponse in response.AsStream(cancellationToken: cancellationToken)) yield return baseResponse; + await foreach (var baseResponse in response.AsStream(justDataMode: justDataMode, cancellationToken: cancellationToken)) yield return baseResponse; } /// @@ -165,7 +169,7 @@ public async Task CreateThreadAndRun(CreateThreadAndRunRequest requ return await _httpClient.PostAndReadAsAsync(_endpointProvider.ThreadAndRunCreate(), requestBody, cancellationToken); } - public async IAsyncEnumerable CreateThreadAndRunAsStream(CreateThreadAndRunRequest createThreadAndRunRequest, string? modelId = null, bool justDataMode = true, + public async IAsyncEnumerable CreateThreadAndRunAsStream(CreateThreadAndRunRequest createThreadAndRunRequest, string? modelId = null, bool justDataMode = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Mark the request as streaming @@ -178,11 +182,11 @@ public async IAsyncEnumerable CreateThreadAndRunAsStream(CreateThre if (!response.IsSuccessStatusCode) { - yield return await response.HandleResponseContent(cancellationToken); + yield return await response.HandleResponseContent(cancellationToken); yield break; } - await foreach (var baseResponse in response.AsStream(cancellationToken: cancellationToken)) yield return baseResponse; + await foreach (var baseResponse in response.AsStream(justDataMode: justDataMode, cancellationToken: cancellationToken)) yield return baseResponse; } /// diff --git a/OpenAI.SDK/ObjectModels/ResponseModels/BaseResponse.cs b/OpenAI.SDK/ObjectModels/ResponseModels/BaseResponse.cs index 9e5e0f07..3273e0c1 100644 --- a/OpenAI.SDK/ObjectModels/ResponseModels/BaseResponse.cs +++ b/OpenAI.SDK/ObjectModels/ResponseModels/BaseResponse.cs @@ -5,11 +5,15 @@ namespace OpenAI.ObjectModels.ResponseModels; -public record BaseResponse +public record ObjectBaseResponse { [JsonPropertyName("object")] public string? ObjectTypeName { get; set; } - +} +public record BaseResponse: ObjectBaseResponse +{ + [JsonPropertyName("StreamEvent")] + public string? StreamEvent { get; set; } public bool Successful => Error == null; [JsonPropertyName("error")] diff --git a/OpenAI.SDK/ObjectModels/SharedModels/MessageResponse.cs b/OpenAI.SDK/ObjectModels/SharedModels/MessageResponse.cs index 1135538f..09bd682f 100644 --- a/OpenAI.SDK/ObjectModels/SharedModels/MessageResponse.cs +++ b/OpenAI.SDK/ObjectModels/SharedModels/MessageResponse.cs @@ -9,6 +9,11 @@ namespace OpenAI.ObjectModels.SharedModels; /// public record MessageResponse : BaseResponse, IOpenAiModels.IId, IOpenAiModels.ICreatedAt, IOpenAiModels.IMetaData, IOpenAiModels.IAssistantId { + [JsonPropertyName("delta")] + public MessageResponse Delta + { + set => Content = value.Content; + } /// /// The thread ID that this message belongs to. ///