Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC/experimental] StreamData API for streaming additional data to the client #425

Merged
merged 39 commits into from
Aug 17, 2023

Conversation

MaxLeiter
Copy link
Member

@MaxLeiter MaxLeiter commented Aug 7, 2023

This is an experimental API and is subject to change. Please leave (non-code related) feedback! the code is WIP and is mostly for experimentation

One of the most common requested features has been how to stream additional data to the client. I think we have a pretty good solution now.

There's a lot of duplicate code here because we're going to support the old protocol and new one while experimenting. As a result this functionality is only implemented in ai/react's useChat

On the server (StreamData)

The StreamData class is a new TransformStream wrapper available on the server from the root ai package.

You instantiate it like any other class:

  const data = new experimental_StreamData()

and use it in your API:

+  const data = new experimental_StreamData()
  const stream = OpenAIStream(response, {
    experimental_onFunctionCall: async (
      { name, arguments: args },
      createFunctionCallMessages
    ) => {
      if (name === 'get_current_weather') {
        // Call a weather API here
        const weatherData = {
          temperature: 20,
          unit: args.format === 'celsius' ? 'C' : 'F'
        }

+        data.append({
+          text: 'Some custom data'
+        })

        return;
      }
    }, 
+   onFinish() { 
+ 	// IMPORTANT! the user must manually close the stream 
+     data.close();
+	},
+  // IMPORTANT! this is only required while we support both protocols
+  experimental_StreamData: true
  })

+ data.append({ done: true })

  return new StreamingTextResponse(finalStream, {
+ // IMPORTANT! behind the scenes, StreamingTextResponse adds a header if `data` is present. If you aren't using StreamingTextResponse you need to add the header yourself. 
+  {},
+, data);

On the client

The useChat and useCompletion hooks return a new object, data:

  const { messages, input, handleInputChange, handleSubmit, data } = useChat({

data is the array of JS objects you appended to with StreamData. Correlating data to specific messages is not handled by the SDK and will be managed by the user. In the future this will likely change, and we'll maintain the global data and a message.data.

How it works

This requires moving the API from a simple raw text response to a prefixed protocol inspired by React Server Components. Implementation-wise this can later be migrated to Server-Side Events.

The current prefix map looks like this, and is exported so it can be used in userspace:

/**
 * The map of prefixes for data in the stream
 *
 * - 0: Text from the LLM response
 * - 1: (OpenAI) function_call responses
 * - 2: custom JSON added by the user using `Data`
 *
 * Example:
 * ```
 * 0:Vercel
 * 0:'s
 * 0: AI
 * 0: AI
 * 0: SDK
 * 0: is great
 * 0:!
 * 2: { "someJson": "value" }
 * 1: {"function_call": {"name": "get_current_weather", "arguments": "{\\n\\"location\\": \\"Charlottesville, Virginia\\",\\n\\"format\\": \\"celsius\\"\\n}"}}
 *```
 */
export const StreamStringPrefixes = {
  text: 0,
  function_call: 1,
  data: 2
  // user_err: 3?
} as const

Caveats

  • Implementation complexity increases. For now, this is only implemented in useChat until it's less experimental/more widely agreed upon
  • Client-side parsing is no longer as simple as fetching the request. Non-SDK client's will need to manually parse the prefixes and construct messages, but we can provide helpers to make it simpler.
  • More bandwidth is used, although the number of tokens remains the same, so this isn't a very big deal.

What's next?

  • this is basically SSE but only consumed by us. In the future we can adjust the transport format.
  • data tied to each message if they come in the same response

Squashed commit of the following:

commit cfef747ce032669984b69ce60b7e8893171e012a
Author: Max Leiter <max.leiter@vercel.com>
Date:   Mon Aug 7 10:57:32 2023 -0700

    use-chat work

commit 61b19f92739279dba0b33fec689ee8a88ac9b148
Merge: c234597 36571f9
Author: Max Leiter <max.leiter@vercel.com>
Date:   Mon Aug 7 10:20:49 2023 -0700

    Merge remote-tracking branch 'origin/main' into max/new-stream

commit c234597cb120182843a9f1b5f77649fbacf5a133
Merge: 8928799 6a2f9bf
Author: Max Leiter <max.leiter@vercel.com>
Date:   Mon Aug 7 10:20:40 2023 -0700

    Merge remote-tracking branch 'origin/main' into max/new-stream

commit 36571f9
Author: Joshua Lochner <admin@xenova.com>
Date:   Mon Aug 7 18:43:40 2023 +0200

    Update Hugging Face example (#423)

    Minor updates to the HF example application.
    - Remove `openai-edge` dependency
    - Do not require HF tokens with write access

commit 3c81ba5
Author: Ahmed Abdelbaset <A7med3bdulBaset@gmail.com>
Date:   Sun Aug 6 23:42:51 2023 +0300

    fix: a small typo in docs (#420)

    In the Next.js app router guide there is this example:
    ```js
      const response = await openai.createChatCompletion({
        model: 'gpt-4',
        stream: true,
        messages: [
          {
            role: 'system',
            content:
              searchParams['prompt'] ?? 'Give me code for generating a JSX button'
          }
        ]
      })
    ```

    But the role should be `user`

commit 6a2f9bf
Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Date:   Fri Aug 4 21:33:13 2023 +0000

    Version Packages (#418)

    This PR was opened by the [Changesets release](https://github.com/changesets/action) GitHub action. When you're ready to do a release, you can merge this and the packages will be published to npm automatically. If you're not ready to do a release yet, that's fine, whenever you add more changesets to main, this PR will be updated.

    # Releases
    ## ai@2.1.32

    ### Patch Changes

    -   5f91427: ai/svelte: fix isLoading return value

commit 89287997e4f8f59c9edd716ae9e068b550a9be64
Author: Max Leiter <max.leiter@vercel.com>
Date:   Wed Aug 2 11:22:18 2023 -0700

    example to undo

commit 78b4bba0e27c9bcad02f025988d16d442c241042
Author: Max Leiter <max.leiter@vercel.com>
Date:   Wed Aug 2 11:22:03 2023 -0700

    progress

commit dde376c9bfae96a37eaaccc9bae9b456230e1aee
Merge: c539031 650b86e
Author: Max Leiter <max.leiter@vercel.com>
Date:   Tue Aug 1 10:11:03 2023 -0700

    Merge remote-tracking branch 'origin/main' into max/new-stream

commit c539031d41d4e8529e5d0f9649c249bcef2345f4
Author: Max Leiter <max.leiter@vercel.com>
Date:   Mon Jul 31 15:57:27 2023 -0700

    progress

commit 363e78a653c029ef48c0f19391708269dff5c5de
Author: Max Leiter <max.leiter@vercel.com>
Date:   Mon Jul 31 09:56:46 2023 -0700

    progress

commit 6dc58ac
Author: Max Leiter <max.leiter@vercel.com>
Date:   Thu Jul 27 10:20:05 2023 -0700

    stream work
@changeset-bot
Copy link

changeset-bot bot commented Aug 7, 2023

⚠️ No Changeset found

Latest commit: 3c901a7

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@MaxLeiter MaxLeiter changed the title [RFC/experimental] StreamData API for streaming arbitrary data to the client [RFC/experimental] StreamData API for streaming additional data to the client Aug 7, 2023
@dnsosebee
Copy link
Contributor

Thanks for this! This is exactly the API I was imagining to solve #350.

My use case involves per-message data, but I can understand why a global data structure affords more flexibility and will probably cover more use cases. I would just do some zipping for my use case

@newme616
Copy link

This is a great idea! Would love to see this in production soon

@MaxLeiter MaxLeiter requested a review from shuding August 15, 2023 19:50
Copy link
Member

@shuding shuding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great in general!

packages/core/streams/openai-stream.ts Show resolved Hide resolved
packages/core/shared/utils.ts Outdated Show resolved Hide resolved
packages/core/streams/stream-data.ts Outdated Show resolved Hide resolved
packages/core/streams/stream-data.ts Show resolved Hide resolved
MaxLeiter and others added 2 commits August 16, 2023 09:16
Co-authored-by: Shu Ding <g@shud.in>
@MaxLeiter MaxLeiter merged commit 84e0cc8 into main Aug 17, 2023
@MaxLeiter MaxLeiter deleted the max/new-stream-2 branch August 17, 2023 21:22
@github-actions github-actions bot mentioned this pull request Aug 17, 2023
@shuding
Copy link
Member

shuding commented Aug 17, 2023

It only updates the data array when a new message begins to stream.

This is something we can improve in a follow up PR as well, thanks for the feedback!

@rajeshdavidbabu
Copy link

@MaxLeiter 🔥 it's working now. but the output is coming out strange: Screenshot 2023-08-17 at 1 13 45 PM

If it helps, happy to hop on a quick call.

I am facing the same issue now, I also added COMPLEX_HEADER and dint fix it for me.

@rajeshdavidbabu
Copy link

Screenshot 2023-08-18 at 12 49 50

Am I missing something, I am using stream from Langchain Stream so not entirely sure where to add the experimental_StreamData to true.

@MaxLeiter
Copy link
Member Author

@rajeshdavidbabu ah the new parser hasn't been implemented in LangChainStream yet, sorry.

If you'd like to contribute it, I believe it's mostly a case of removing the pipeThrough(createStreamDataTransformer()) line and adding getStreamString() around the writer.write()s

https://github.com/vercel/ai/blob/main/packages/core/streams/langchain-stream.ts#L33-L68

@rajeshdavidbabu
Copy link

Nice. Can take a look !

@rayli09
Copy link

rayli09 commented Aug 20, 2023

@MaxLeiter hi there, we're trying the feature but we got a weird behavior that on the front end it keeps triggering requests.

We're using next-js@13.4.9, ai@2.2.6, and enabled the feature flags.

checking on the console log in browser, we did see custom data being piped, but on the front end, the streamed message first completes, and then vanished, and then it keeps re-rendering the ui and re triggering multiple request with the same user's last input.

  // Convert the response into a friendly text-stream
  const stream = OpenAIStream(response, {
    onStart: async () => {
      // save to db
    },
    onToken: async (_token: string) => {
      // log
    },
    onCompletion: async (completion: string) => {
      // save to db
    },
    onFinal: async (_completion: string) => {
      // close the stream
      await customData.close();
    },
    // experimental flag to stream custom data.
    experimental_streamData: true,
  });
  return new StreamingTextResponse(stream, {}, customData);

@rajeshdavidbabu
Copy link

Afaik, its already been implemented https://github.com/vercel/ai/blob/main/packages/core/streams/stream-data.ts#L111.

So I simply have to pass the experimental_streamData flag to be true when I create an instance of LangChainStream

const { stream, handlers } = LangChainStream({
      experimental_streamData: true,
    });

This works for me.

@sebastianhoitz
Copy link

@MaxLeiter thank you so much, this is really cool - one question: You said you might be able to get per-message data working. I don't see anything locally at the moment, only the global data.

Is this because I'm using a LangChain agent? Or is it actually missing?

@MaxLeiter
Copy link
Member Author

@sebastianhoitz it's probably only a few line change but it wasn't included in the initial version:

Correlating data to specific messages is not handled by the SDK and will be managed by the user. In the future this will likely change, and we'll maintain the global data and a message.data.

@dnsosebee
Copy link
Contributor

Thanks for the work on this. I'm going to try out function calling and so may not need to stream extra data.

However I've been thinking about DX and have an idea: I think the core data of RAG apps is not a message log, it's an event log containing user messages, function calls, function responses, and assistant messages. A RAG app needs to present views of that event log to both the user and the assistant; the assistant's view is a message log, and the user's view is whatever JSON drives their UI. The current API (pre-experimental_streamData), which thinks of things as OpenAI-style message logs rather than generic event logs, makes it hard to handle those two different views of the data.

For example, in my RAG app I want my search results to be formatted as a plaintext system message for the OpenAI message array, but want to view those same results as JSON client side, so that I can present a "related research" UI section. Under the current functionCalling API that might involve parsing system messages client side, which is unwieldy. With the current streamData API that would involve interleaving the data array back into my message history as system messages, which is also a bit awkward. Either that or duplicating data as system messages AND streamData.

So I think my ideal API would be to have a generic type called "event" that looks something like this:

type Event<T> = {
  name: string // or generic string literal
  data: T
}

This Event type would be part of message history, and I would set up functions on client and server side to determine how to map that event to a system message for OpenAI, and how to interpret the event data as UI client side.

This is a bit different from message-correlated data, since the data event is a neighboring member in the message/event array rather than an additional field of a message. I think that's more sensible, since fundamentally the data event has its own chronological place in the stream, you can even think of a data event as a message from a 3rd party agent, like a 'search agent' who just sent a message to the stream.

Curious to hear people's thoughts on this!

@GorvGoyl
Copy link

Client-side parsing is no longer as simple as fetching the request. Non-SDK client's will need to manually parse the prefixes and construct messages, but we can provide helpers to make it simpler.

I'm using Vercel AI streaming response in the Chrome extension and the SDK is not supported. Could you provide the helper to parse data?

@aaron5670
Copy link

aaron5670 commented Jan 19, 2024

@MaxLeiter 🔥 it's working now. but the output is coming out strange: Screenshot 2023-08-17 at 1 13 45 PM
If it helps, happy to hop on a quick call.

I am facing the same issue now, I also added COMPLEX_HEADER and dint fix it for me.

I have the same issue, this is my response:

[
    {
        "content": "Hello",
        "role": "user",
        "createdAt": "2024-01-19T09:07:36.443Z",
        "id": "31kQ0KF"
    },
    {
        "id": "ES6PN2g",
        "createdAt": "2024-01-19T09:07:39.792Z",
        "content": "2:[{\"text\":\"Hello, how are you?\"}]\n0:\"Bonjour\"\n0:\"!\"\n0:\" Comment\"\n0:\" puis\"\n0:\"-\"\n0:\"je\"\n0:\" vous\"\n0:\" aider\"\n0:\" aujourd\"\n0:\"'hui\"\n0:\"?\"\n",
        "role": "assistant"
    }
]

And this is my backend code:

import {experimental_StreamData, OpenAIStream, StreamingTextResponse} from 'ai';
import {ChatCompletionMessage} from 'openai/resources/chat/completions';
import {createOpenAIEmbedding} from "@/helpers/createOpenAIEmbedding";
import {pineconeIndex} from "@/helpers/pinecone";
import {openai} from "@/helpers/openai";

export async function POST(req: Request) {
  if (!process.env.OPENAI_API_KEY) {
    return Response.json({error: "OpenAI is not enabled, check your env variables"}, {status: 400});
  }

  try {
    const body = await req.json();
    const messages: ChatCompletionMessage[] = body.messages;

    const messagesTruncated = messages.slice(-6);

    const embedding = await createOpenAIEmbedding(
      messagesTruncated.map((message) => message.content).join("\n"),
    );

    const vectorQueryResponse = await pineconeIndex.query({
      vector: embedding,
      topK: 4,
      includeMetadata: true,
      filter: {type: 'product'},
    });

    console.log('vectorQueryResponse', vectorQueryResponse.matches.map((document) => document.metadata?.title));

    const systemMessage: ChatCompletionMessage = {
      role: "assistant",
      content:
        `
        You are a helpful assistant of the online e-commerce store.
        It is very important that you always answer in French, you must not deviate from this at any cost!
        Use the following bits of context to answer the question at the end.
        Make sure your answer consists of a maximum of about 45 words.
        If you don't know the answer, just say you don't know, don't go making up answers.
        ----------------
        CONTEXT:
        ${vectorQueryResponse.matches
          .map((document) => {
            return `Title: ${document.metadata?.title}\n\nContent:\n${document.metadata?.text}`
          })
          .join("\n\n")}
         `
    };

    const response = await openai.chat.completions.create({
      model: "gpt-3.5-turbo",
      stream: true,
      messages: [systemMessage, ...messagesTruncated],
    });

    // Instantiate the StreamData. It works with all API providers.
    const data = new experimental_StreamData();

    const stream = OpenAIStream(response, {
      experimental_onFunctionCall: async (
        { name, arguments: args },
        createFunctionCallMessages,
      ) => {
        if (name === 'get_current_weather') {
          // Call a weather API here
          const weatherData = {
            temperature: 20,
            unit: args.format === 'celsius' ? 'C' : 'F',
          };

          data.append({
            text: 'Some custom data',
          });

          const newMessages = createFunctionCallMessages(weatherData);
          return openai.chat.completions.create({
            messages: [...messages, ...newMessages],
            stream: true,
            model: 'gpt-3.5-turbo-0613',
          });
        }
      },
      onCompletion(completion) {
        console.log('completion', completion);
      },
      onFinal(completion) {
        // IMPORTANT! you must close StreamData manually or the response will never finish.
        data.close();
      },
      // IMPORTANT! until this is stable, you must explicitly opt in to supporting streamData.
      experimental_streamData: true,
    });

    data.append({
      text: 'Hello, how are you?',
    });

    // IMPORTANT! If you aren't using StreamingTextResponse, you MUST have the `X-Experimental-Stream-Data: 'true'` header
    // in your response so the client uses the correct parsing logic.
    return new StreamingTextResponse(stream, {}, data);
  } catch (error) {
    console.error(error);
    return Response.json({error: "Internal server error"}, {status: 500});
  }
}

export async function OPTIONS() {
  return Response.json({status: "OK"});
}

What am I doing wrong and how did you solve this? :)

@aaron5670
Copy link

aaron5670 commented Jan 19, 2024

@MaxLeiter I found the issue.

We have currently a Gatsby app where I want to use this. I tried it with a Next.js app and then it works as expected.

Next.js

This is my Next.js front-end code, where it works:

"use client";

import {useChat} from "ai/react";

const Page = () => {
  const { data, handleInputChange, handleSubmit, messages } = useChat({
    api: '/api/chat',
  });

  console.log('data', data);
  console.log('messages', messages);

  return (
    <div>
      <form onSubmit={handleSubmit}>
        <input
          type="text"
          name="input"
          onChange={handleInputChange}
        />
        <button type="submit">Send</button>
      </form>
    </div>
  )
}

export default Page

Next.js response:

[
    {
        "content": "Hello",
        "role": "user",
        "createdAt": "2024-01-19T09:40:26.742Z",
        "id": "cJCvzYF"
    },
    {
        "id": "4W01c2l",
        "role": "assistant",
        "content": "Bonjour! Comment puis-je vous aider aujourd'hui ?",
        "createdAt": "2024-01-19T09:40:29.129Z"
    }
]

Gatsby

And this is my Gatsby front-end code, where I have encoding issues:

import React from 'react';
import { useChat } from 'ai/react';

export const Page = () => {
  const {
    data, handleInputChange, handleSubmit, messages,
  } = useChat({
    api: 'http://localhost:3000/api/chat',
  });

  console.log('data', data);
  console.log('messages', messages);

  return (
    <div>
      <form onSubmit={handleSubmit}>
        <input
          type="text"
          name="input"
          onChange={handleInputChange}
        />
        <button type="submit">Send</button>
      </form>
    </div>
  );
};

Gatsby response:

[
    {
        "content": "Hello",
        "role": "user",
        "createdAt": "2024-01-19T09:41:10.737Z",
        "id": "oK7t7Lh"
    },
    {
        "id": "9F2Pg9s",
        "createdAt": "2024-01-19T09:41:12.195Z",
        "content": "0:\"Bonjour\"\n0:\"!\"\n0:\" Comment\"\n0:\" puis\"\n0:\"-\"\n0:\"je\"\n0:\" vous\"\n0:\" aider\"\n0:\" aujourd\"\n0:\"'hui\"\n0:\"?\"\n",
        "role": "assistant"
    }
]

@zandko
Copy link

zandko commented Feb 5, 2024

I also encountered the same problem. It is normal locally. This problem occurs when I call the remote server.

@aaron5670
Copy link

I also encountered the same problem. It is normal locally. This problem occurs when I call the remote server.

See #930

@straatrakker
Copy link

straatrakker commented Mar 26, 2024

I am also encountering the same issue. Works fine locally on Next.js but it doesn't seem to work when calling the Next.js API from another React/vite project.

Has anyone found a fix for this?

@Laktus
Copy link

Laktus commented Apr 25, 2024

How would i send this info not from a next backend but e.g. from a python backend with fast api? What header do i have to add?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.