Skip to content

Commit

Permalink
Merge pull request #60 from jianghoy/main
Browse files Browse the repository at this point in the history
Use buffer instead of sliding buffer and remove LinkedList
  • Loading branch information
wkok authored May 19, 2024
2 parents 06f1826 + a87ad65 commit b6a0a28
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/wkok/openai_clojure/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@
([params]
(create-assistant params nil))
([params options]
(let [opts (assoc-in options [:openai-beta] "assistants=v1")]
(let [opts (assoc-in options [:openai-beta] "assistants=v2")]
(core/response-for :create-assistant params opts))))


Expand Down
51 changes: 14 additions & 37 deletions src/wkok/openai_clojure/sse.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
[hato.middleware :as hm]
[clojure.core.async :as a]
[clojure.string :as string]
[cheshire.core :as json]
[clojure.core.async.impl.protocols :as impl])
(:import (java.io InputStream)
(clojure.lang Counted)
(java.util LinkedList)))
[cheshire.core :as json])
(:import (java.io InputStream)))

(def event-mask (re-pattern (str "(?s).+?\n\n")))

Expand All @@ -31,39 +28,17 @@
(-> (subs raw-event data-idx)
(json/parse-string true)))))

(deftype InfiniteBuffer [^LinkedList buf]
impl/UnblockingBuffer
impl/Buffer
(full? [_this]
false)
(remove! [_this]
(.removeLast buf))
(add!* [this itm]
(.addFirst buf itm)
this)
(close-buf! [_this])
Counted
(count [_this]
(.size buf)))

(defn infinite-buffer []
(InfiniteBuffer. (LinkedList.)))
; Per this discussion: https://community.openai.com/t/clarification-for-max-tokens/19576
; if the max_tokens is not provided, the response will try to use all the available
; tokens to generate response, hence DEFAULT_BUFFER_SIZE should be large enough
(def ^:private DEFAULT_BUFFER_SIZE 100000)

(defn calc-buffer-size
"- Use stream_buffer_len if provided.
- Otherwise, buffer size should be at least equal to max_tokens
plus the [DONE] terminator if it is provided.
- Else fallbacks on ##Inf and use an infinite-buffer instead"
[{:keys [stream_buffer_len max_tokens]}]
(or stream_buffer_len
(when max_tokens (inc max_tokens))
##Inf))

(defn make-buffer [params]
(let [size (calc-buffer-size params)]
(if (= size ##Inf)
(infinite-buffer)
(a/sliding-buffer size))))
"Buffer size should be at least equal to max_tokens
plus the [DONE] terminator"
[{:keys [max_tokens]
:or {max_tokens DEFAULT_BUFFER_SIZE}}]
(inc max_tokens))

(defn sse-events
"Returns a core.async channel with events as clojure data structures.
Expand All @@ -72,7 +47,8 @@
(let [event-stream ^InputStream (:body (http/request (merge request
params
{:as :stream})))
events (a/chan (make-buffer params) (map parse-event))]
buffer-size (calc-buffer-size params)
events (a/chan (a/buffer buffer-size) (map parse-event))]
(a/thread
(loop [byte-coll []]
(let [byte-arr (byte-array (max 1 (.available event-stream)))
Expand Down Expand Up @@ -181,3 +157,4 @@
(assoc ctx :response (if (:stream params)
(sse-request ctx')
(http/request request'))))))})

0 comments on commit b6a0a28

Please sign in to comment.