Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new streaming ask endpoint (WIP) #400

Conversation

JonathanVelkeneers
Copy link

@JonathanVelkeneers JonathanVelkeneers commented Apr 12, 2024

Motivation and Context (Why the change? What's the scenario?)

Draft PR for issue #100 .
This draft pr was very briefly discussed in discord. The changes still need work.
currently missing:

  • more examples (serverless, service, curl,...)
  • Get the streaming working in WebClient, it currently still buffers the response (i was not able to find a fix)

High level description (Approach, Design)

Used existing AskAsync method as reference point for most of the code.
The new streaming endpoint does not return a MemoryAnswer, but returns the actual text result in an async enumerable.
If sources are needed the SearchAsync method can still be used to fetch them with minimal extra performance overhead.

@JonathanVelkeneers
Copy link
Author

@microsoft-github-policy-service agree

@JonathanVelkeneers
Copy link
Author

JonathanVelkeneers commented Apr 24, 2024

Also I want to note, the way I have this implemented currently is just returning the whole answer as a stream of strings.
This might not be optimal as the code that consumes these methods needs to call SearchAsync() if they want the sources, and this would result in an extra DB query.

Another implementation i was thinking of was returning a stream of PartialMemoryAnswer(MemoryAnswer with nullable properties), with the first object returning the metadata (such as facts/citations), and the parts after the first one only containing their pieces of text.
This would also mean that the object can be extended upon in the future.

example:

var noAnswerFound = new PartialMemoryAnswer
{
    Question = question,
    NoResult = true,
    Result = this._config.EmptyAnswer,
};

StringBuilder bufferedAnswer = new(); // Buffering result in memory to count chars and check if not empty answer
bool finishedRequiredBuffering = false;
var watch = Stopwatch.StartNew();
await foreach (var x in this.GenerateAnswerAsync(question, facts.ToString()).WithCancellation(cancellationToken).ConfigureAwait(false))
{
    if (x is null || x.Length == 0)
    {
        continue;
    }

    bufferedAnswer.Append(x);
    int currentLength = bufferedAnswer.Length;

    if (!finishedRequiredBuffering)
    {
        if (currentLength <= this._config.EmptyAnswer.Length && ValueIsEquivalentTo(bufferedAnswer.ToString(), this._config.EmptyAnswer))
        {
            this._log.LogTrace("Answer generated in {0} msecs. No relevant memories found", watch.ElapsedMilliseconds);
            noAnswerFound.NoResultReason = "No relevant memories found";
            yield return noAnswerFound;
            yield break;
        }
        else if (currentLength > this._config.EmptyAnswer.Length)
        {
            finishedRequiredBuffering = true;
            yield return new PartialMemoryAnswer
            {
                Question = question,
                NoResult = false,
                Result = bufferedAnswer.ToString(),
                RelevantSources = citationList,
            };
        }
    }

    if (finishedRequiredBuffering)
    {
        yield return new PartialMemoryAnswer
        {
            Result = x
        };
    }

    if (this._log.IsEnabled(LogLevel.Trace) && currentLength >= 30)
    {
        this._log.LogTrace("{0} chars generated", currentLength);
    }
}

@@ -257,4 +257,29 @@ public Task DeleteDocumentAsync(string documentId, string? index = null, Cancell
minRelevance: minRelevance,
cancellationToken: cancellationToken);
}

/// <inheritdoc />
public IAsyncEnumerable<string> AskStreamingAsync(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this new method returns only the answer, missing information about sources, relevance, etc. Why not stream a json object?

Copy link
Author

@JonathanVelkeneers JonathanVelkeneers May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this new method returns only the answer, missing information about sources, relevance, etc. Why not stream a json object?

I did a small write-up here.

In this PR I made it as a stream of strings, because using an object would require making some design choices.
Is the extra information (sources, relevance, etc) sent with every "answer part", or only the first/last one?

In an implementation I did in a personal project I sent all the metadata in the first response part, and only sent messageID + messageChunk with every remaining part.

If you prefer that approach or have another solution I can rewrite these changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think everyone will want streaming, without giving up the other metadata, in particular the list of sources and their relevance. I would use the same approach used by OpenAI streaming, returning a content_update property within the response object, that includes the token - potentially sending the list of sources on the first response only using the same approach (something like sources_update) to avoid the overhead of sending the same sources with every token.

Something like, pseudo structure (I would reuse the existing class, just change it a bit to support "_update"):

response:

{
    sources_update {
          stream of items.   // sent all at once with the first token, then the list stays empty
   },
   content_update {
          next string token
   }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some changes to return MemoryAnswer instead of string.
I'm not sure what you exactly want with MemoryAnswer. New properties for streaming, or modifying the existing ones?

In my latest changes I've used the existing properties for the time being

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my two cents, that sounds perfect @dluc . We're interested as well in streaming, it's one of the reasons why we have not implemented KM yet. We do indeed need the sources as well.
To send the sources only on the first response makes perfect sense.

Just wondering when this will be merged? Are there any blockers? I could possibly assist if need be.

@dluc
Copy link
Collaborator

dluc commented Jun 9, 2024

some conflicts to address

@dluc dluc added the waiting for author Waiting for author to reply or address comments label Jun 9, 2024
@chaelli
Copy link
Contributor

chaelli commented Jun 9, 2024

some conflicts to address

if it helps - I rebased and fixed some issues => https://github.com/chaelli/kernel-memory/tree/100-streaming-askAsync-response
not sure how to handle this. PR into @JonathanVelkeneers' branch? Or create a separat PR so that the validation tasks can run?
Anyway... probably should do some more testing before that.

@JonathanVelkeneers
Copy link
Author

some conflicts to address

Resolved these

@chaelli
Copy link
Contributor

chaelli commented Jun 12, 2024

any reason this is waiting? do we need to add SSE format for the endpoint? (if so - maybe this works #625 (reply in thread) - did for my quick test in the browser - but my experience with SSE are minimal ;))

@roldengarm
Copy link
Contributor

roldengarm commented Jun 13, 2024

@dluc just checking in, can this be merged soon please? We're about to build a UI using Kernel Memory and streaming is a must-have

@JonathanVelkeneers can you please check as well?
Thanks heaps, would be so great to have streaming capabilities, and I believe we're not the only one :)

@dluc
Copy link
Collaborator

dluc commented Jun 14, 2024

@dluc just checking in, can this be merged soon please? We're about to build a UI using Kernel Memory and streaming is a must-have

@JonathanVelkeneers can you please check as well? Thanks heaps, would be so great to have streaming capabilities, and I believe we're not the only one :)

sorry to keep this on hold but I cannot merge yet. I'd suggest to work with a fork not to be blocked. It's an important feature but there's some other critical work in progress.

@roldengarm
Copy link
Contributor

@dluc thanks for the reply, understood. Do you by any chance have a rough ETA when you will be ready to merge it?

We've already forked the repo so we can auto deploy it to our infra, so the service side is no big deal.
However, in our apps we're using the nuget package Microsoft.KernelMemory.WebClient which we'd then have to build ourselves.

@chaelli
Copy link
Contributor

chaelli commented Jun 18, 2024

@JonathanVelkeneers I played around a bit with SSE and now have a working solution that does "clean" SSE (via get). Basically, I use two endpoints - one that starts the answer generation and returns an id and one that then streams the answer:


public static void AddAskStreamEndpoint(
        this IEndpointRouteBuilder builder, string apiPrefix = "/", IEndpointFilter? authFilter = null)
    {
        RouteGroupBuilder group = builder.MapGroup(apiPrefix);

        // Ask streaming endpoint
        var route = group.MapPost(Constants.HttpAskStreamEndpoint, async Task<IResult> (
                HttpContext context,
                MemoryQuery query,
                IKernelMemory service,
                ILogger<KernelMemoryWebAPI> log,
                CancellationToken cancellationToken) =>
            {
                log.LogTrace("New search request, index '{0}', minRelevance {1}", query.Index, query.MinRelevance);

                var askId = Guid.NewGuid().ToString("N").Substring(0, 8);
                s_askStreams.Add(askId, service.AskStreamingAsync(
                    question: query.Question,
                    index: query.Index,
                    filters: query.Filters,
                    minRelevance: query.MinRelevance,
                    cancellationToken: cancellationToken));

                return Results.Ok(new AskStreamResponse { AskId = askId });
            })
            .Produces<AskStreamResponse>(StatusCodes.Status200OK)
            .Produces<ProblemDetails>(StatusCodes.Status401Unauthorized)
            .Produces<ProblemDetails>(StatusCodes.Status403Forbidden);

        if (authFilter != null) { route.AddEndpointFilter(authFilter); }

        // SSE endpoint
        var sseRoute = group.MapGet($"{Constants.HttpAskStreamEndpoint}/{{askId}}", async Task (
            HttpContext context,
            ILogger<KernelMemoryWebAPI> log,
            CancellationToken cancellationToken,
            string askId) =>
            {
                log.LogTrace("Accessing SSE stream for askId '{0}'", askId);
                context.Response.Headers.Add("Content-Type", "text/event-stream");
                if (s_askStreams.TryGetValue(askId, out IAsyncEnumerable<MemoryAnswer>? stream))
                {
                    s_askStreams.Remove(askId);
                    var jsonOptions = new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault };
                    var response = context.Response;
                    if (stream != null)
                    {
                        await foreach (var ma in stream)
                        {
                            await response.WriteAsync($"data: {JsonSerializer.Serialize(new { message = ma }, jsonOptions)}\n\n", cancellationToken).ConfigureAwait(false);
                            await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
                        }

                        // Send an end-of-stream event if needed
                        await response.WriteAsync("event: end\ndata: End of stream\n\n", cancellationToken).ConfigureAwait(false);
                        await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
                        return;
                    }
                }
                context.Response.StatusCode = StatusCodes.Status404NotFound;
                await context.Response.WriteAsync("Not found", cancellationToken).ConfigureAwait(false);
            });

When requesting the answer from the browser, this is the basic functionality:

  "headers": {
    "accept": "*/*",
    "authorization": "...",
    "cache-control": "no-cache",
    "Content-Type": "application/json"
  },
  "body": "{\"question\":\"was macht viu?\",\"index\":\"viuch-largeembedding\"}",
  "method": "POST",
  "mode": "cors",
  "credentials": "include"
});
const responseData = await response.json();
const askId = responseData.askId;

const sseUrl = `http://localhost:9001/ask/stream/${askId}`;
const eventSource = new EventSource(sseUrl);

let responseText = '';
eventSource.onmessage = function(event) {
    const responeObject = JSON.parse(event.data);
    responseText += responeObject.message.text;
    console.log(responseText);
};

eventSource.addEventListener('end', function(event) {
    console.log('End of stream');
    eventSource.close(); // Close the SSE connection
});

maybe you can add this to your branch?

@chaelli
Copy link
Contributor

chaelli commented Jun 18, 2024

@dluc does it make sense for us to rebase this? or will it take much longer to merge?
And @JonathanVelkeneers if you do rebase (or merge & fix) - I'd suggest you extract the fact generation & token calculation into a separate method - otherwise we'll always need to fix this stuff twice if it changes.
I can certainly help - but I'd rather only invest the time once we know theres a good chance it will be merged

@dluc
Copy link
Collaborator

dluc commented Jun 18, 2024

@dluc does it make sense for us to rebase this? or will it take much longer to merge? And @JonathanVelkeneers if you do rebase (or merge & fix) - I'd suggest you extract the fact generation & token calculation into a separate method - otherwise we'll always need to fix this stuff twice if it changes. I can certainly help - but I'd rather only invest the time once we know theres a good chance it will be merged

no worries I'll take care of the rebase ;-)

@stavroskasidis
Copy link

Is this feature still gonna be implemented?

@dluc
Copy link
Collaborator

dluc commented Oct 10, 2024

Is this feature still gonna be implemented?

Apologies for the delay, I had to put this topic on hold, and we currently have two PRs for the same feature. I’ll need to review and decide the best approach. Unfortunately, no ETA at the moment

@dluc
Copy link
Collaborator

dluc commented Oct 16, 2024

Update: for this feature to be merged, there's a couple of things to do:

  • Check this similar PR Implement Response Streaming #726 and decide which approach to take
  • Support content moderation. The stream of tokens needs to be validated while streamed, on a configurable frequence. If at any point the text moderation fails, the stream needs to be reset, e.g. sending a special token or similar.

@nurkmez2
Copy link

@JonathanVelkeneers
It would be great if you could complete this feature.
Thanks

@dluc
Copy link
Collaborator

dluc commented Dec 1, 2024

@JonathanVelkeneers thanks for helping! - There were two PRs addressing the same feature, and I had to decide between them. The Response Streaming implementation is now merged into main. For more details, please refer to PR #726.

@dluc dluc closed this Dec 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting for author Waiting for author to reply or address comments
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants