Skip to content

Commit

Permalink
Make sse buffer size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
wkok committed Nov 24, 2023
1 parent badf935 commit eb7937e
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions src/wkok/openai_clojure/sse.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
[hato.client :as http]
[clojure.core.async :as a]
[clojure.string :as string]
[cheshire.core :as json])
(:import (java.io InputStream)))
[cheshire.core :as json]
[clojure.core.async.impl.protocols :as impl])
(:import (java.io InputStream)
(clojure.lang Counted)
(java.util LinkedList)))

(def event-mask (re-pattern (str "(?s).+?\n\n")))

Expand All @@ -26,13 +29,39 @@
(-> (subs raw-event data-idx)
(json/parse-string true)))))

(deftype InfiniteBuffer [^LinkedList buf]
impl/UnblockingBuffer
impl/Buffer
(full? [_this]
false)
(remove! [_this]
(.removeLast buf))
(add!* [this itm]
(.addFirst buf itm)
this)
(close-buf! [_this])
Counted
(count [_this]
(.size buf)))

(defn infinite-buffer []
(InfiniteBuffer. (LinkedList.)))

(defn calc-buffer-size
"Buffer size should be at least equal to max_tokens
or 16 (the default in openai as of 2023-02-19)
plus the [DONE] terminator"
[{:keys [max_tokens]
:or {max_tokens 16}}]
(inc max_tokens))
"- Use stream_buffer_len if provided.
- Otherwise, buffer size should be at least equal to max_tokens
plus the [DONE] terminator if it is provided.
- Else fallbacks on ##Inf and use an infinite-buffer instead"
[{:keys [stream_buffer_len max_tokens]}]
(or stream_buffer_len
(when max_tokens (inc max_tokens))
##Inf))

(defn make-buffer [params]
(let [size (calc-buffer-size params)]
(if (= size ##Inf)
(infinite-buffer)
(a/sliding-buffer size))))

(defn sse-events
"Returns a core.async channel with events as clojure data structures.
Expand All @@ -41,8 +70,7 @@
(let [event-stream ^InputStream (:body (http/request (merge request
params
{:as :stream})))
buffer-size (calc-buffer-size params)
events (a/chan (a/sliding-buffer buffer-size) (map parse-event))]
events (a/chan (make-buffer params) (map parse-event))]
(a/thread
(loop [byte-coll []]
(let [byte-arr (byte-array (max 1 (.available event-stream)))
Expand Down

0 comments on commit eb7937e

Please sign in to comment.