Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python openai multithread system test and fix hang in completeFunc #21

Merged
merged 25 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
747ea89
add python openai multithread system test
samos123 Nov 11, 2023
c33fecb
update tests
samos123 Nov 11, 2023
18ee353
increase to 1000 requests to get to 2+ replicas
samos123 Nov 11, 2023
7633c22
change back to 500 requests since 1k caused error
samos123 Nov 11, 2023
c6587f3
allow rerunning test without tearing down kind cluster
samos123 Nov 12, 2023
65e10b0
add helm repo if not exist
samos123 Nov 12, 2023
ffb0436
add configurable timeout
samos123 Nov 12, 2023
a4fc117
add more info to logs in handler
samos123 Nov 12, 2023
29a316e
switch fmt to log
samos123 Nov 12, 2023
da8df8d
Track all active requests in queue rather than just pending in queue
nstogner Nov 12, 2023
3d21eb5
remove port-forward from tests
samos123 Nov 12, 2023
9c8ca6a
reuse client across threads
samos123 Nov 12, 2023
df85ae3
increase MaxConns for DefaultTransport
samos123 Nov 12, 2023
d01d63d
add RetryAble ReverseProxy
samos123 Nov 12, 2023
e9a5c3c
add log to queue.completeFunc
samos123 Nov 13, 2023
0e26c2b
completeFunc fix sending to queue.completed
samos123 Nov 13, 2023
3ff4f2f
add logging statement to handler.go
samos123 Nov 13, 2023
9384ad2
update tests
samos123 Nov 13, 2023
a3fa49b
create client per thread
samos123 Nov 13, 2023
6a77205
try to fix integration tests
samos123 Nov 13, 2023
9c2ec25
Add in-progress counter to queue
nstogner Nov 13, 2023
61b2b22
Move inProgress increment
nstogner Nov 13, 2023
03e7134
remove retry logic
samos123 Nov 13, 2023
55ebe94
revert integration_test.go changes
samos123 Nov 13, 2023
bbcb0c9
better revert
samos123 Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
bin
.venv
2 changes: 1 addition & 1 deletion autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Autoscaler struct {
func (a *Autoscaler) Start() {
for range time.Tick(a.Interval) {
log.Println("Calculating scales for all")
for deploymentName, waitCount := range a.FIFO.WaitCounts() {
for deploymentName, waitCount := range a.FIFO.TotalCounts() {
avg := a.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
Expand Down
2 changes: 2 additions & 0 deletions deploy/proxy_service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ spec:
- protocol: TCP
port: 80
targetPort: 8080
nodePort: 30080
type: NodePort
9 changes: 5 additions & 4 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

h.Deployments.AtLeastOne(deploy)

log.Println("Entering queue")
log.Println("Entering queue", id)
complete := h.FIFO.EnqueueAndWait(r.Context(), deploy, id)
log.Println("Admitted into queue")
log.Println("Admitted into queue", id)
defer complete()

log.Println("Waiting for IPs")
log.Println("Waiting for IPs", id)
host := h.Endpoints.GetHost(r.Context(), deploy)
log.Printf("Got host: %v", host)
log.Printf("Got host: %v, id: %v\n", host, id)

// TODO: Avoid creating new reverse proxies for each request.
// TODO: Consider implementing a round robin scheme.
log.Printf("Proxying request to host %v: %v\n", host, id)
newReverseProxy(host).ServeHTTP(w, proxyRequest)
}

Expand Down
11 changes: 7 additions & 4 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"fmt"
"log"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -32,6 +33,7 @@ func TestIntegration(t *testing.T) {

backendRequests := &atomic.Int32{}
testBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println("Serving request from testBackend")
backendRequests.Add(1)
<-backendComplete
w.WriteHeader(200)
Expand All @@ -55,22 +57,23 @@ func TestIntegration(t *testing.T) {
// Send request number 1
var wg sync.WaitGroup
sendRequests(t, &wg, modelName, 1)

requireDeploymentReplicas(t, deploy, 1)
require.Equal(t, int32(1), backendRequests.Load(), "ensure the request made its way to the backend")
completeRequests(backendComplete, 1)

// Ensure the deployment scaled scaled past 1.
// 1/3 should be admitted
// 2/3 should remain in queue --> replicas should equal 2
sendRequests(t, &wg, modelName, 3)
// 1/2 should be admitted
// 1/2 should remain in queue --> replicas should equal 2
sendRequests(t, &wg, modelName, 2)
requireDeploymentReplicas(t, deploy, 2)

// Make sure deployment will not be scaled past default max (3).
sendRequests(t, &wg, modelName, 2)
requireDeploymentReplicas(t, deploy, 3)

// Have the mock backend respond to the remaining 4 requests.
completeRequests(backendComplete, 5)
completeRequests(backendComplete, 4)

// Ensure scale-down.
requireDeploymentReplicas(t, deploy, 0)
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func run() error {
autoscaler.FIFO = fifo
go autoscaler.Start()

// Change the global defaults and remove limits on max conns
defaultTransport := http.DefaultTransport.(*http.Transport)
defaultTransport.MaxIdleConns = 0
defaultTransport.MaxIdleConnsPerHost = 0
defaultTransport.MaxConnsPerHost = 0
handler := &Handler{
Deployments: scaler,
Endpoints: endpoints,
Expand Down
57 changes: 45 additions & 12 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package queue
import (
"container/list"
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -30,6 +31,14 @@ type FIFOQueue struct {

// completed signals when a item that has been dequeued has completed.
completed chan struct{}

// totalCount is the number of requests that have been enqueued
// but not yet completed.
totalCount atomic.Int64

// inProgressCount is the number of requests that have been dequeued
// but not yet completed.
inProgressCount atomic.Int64
}

type item struct {
Expand All @@ -46,6 +55,9 @@ type item struct {
}

func (q *FIFOQueue) dequeue(itm *item, inProgress bool) {
if inProgress {
q.inProgressCount.Add(1)
}
q.listMtx.Lock()
itm.inProgress = inProgress
q.list.Remove(itm.e)
Expand All @@ -60,6 +72,7 @@ func (q *FIFOQueue) dequeue(itm *item, inProgress bool) {
// It returns a function that should be called after all work has completed.
// The id parameter is only used for tracking/debugging purposes.
func (q *FIFOQueue) EnqueueAndWait(ctx context.Context, id string) func() {
q.totalCount.Add(1)
itm := &item{
id: id,
dequeued: make(chan struct{}),
Expand Down Expand Up @@ -87,27 +100,43 @@ func (q *FIFOQueue) EnqueueAndWait(ctx context.Context, id string) func() {

func (q *FIFOQueue) completeFunc(itm *item) func() {
return func() {
log.Println("Running completeFunc: ", itm.id)
q.totalCount.Add(-1)

log.Println("Locking queue.list: ", itm.id)
q.listMtx.Lock()
if !itm.closed {
log.Println("Closing item.dequeued: ", itm.id)
close(itm.dequeued)
itm.closed = true
}

inProgress := itm.inProgress
log.Printf("Item %v inProgress: %v\n", itm.id, inProgress)
q.listMtx.Unlock()

if inProgress {
q.inProgressCount.Add(-1)

// Make sure we only send a message on the completed channel if the
// item was counted as inProgress.
q.completed <- struct{}{}
select {
case q.completed <- struct{}{}:
log.Println("Sent completed message: ", itm.id)
default:
log.Println("Did not send completed message: ", itm.id)
}
}

log.Println("Finished completeFunc: ", itm.id)
}
}

func (q *FIFOQueue) Start() {
var inProgress int
for {
if inProgress >= q.GetConcurrency() {
if q.inProgressCount.Load() >= int64(q.GetConcurrency()) {
log.Println("Waiting for requests to complete")
<-q.completed
inProgress--
continue
}

Expand All @@ -121,11 +150,9 @@ func (q *FIFOQueue) Start() {
continue
}

inProgress++

itm := e.Value.(*item)
q.dequeue(itm, true)
fmt.Println("Dequeued: ", itm.id)
log.Println("Dequeued: ", itm.id)

time.Sleep(time.Second / 100)
}
Expand All @@ -143,8 +170,14 @@ func (q *FIFOQueue) SetConcurrency(n int) {
q.concurrencyMtx.Unlock()
}

func (q *FIFOQueue) Size() int {
q.listMtx.Lock()
defer q.listMtx.Unlock()
return q.list.Len()
// TotalCount returns all requests that have made a call to EnqueueAndWait()
// but have not yet completed.
func (q *FIFOQueue) TotalCount() int64 {
return q.totalCount.Load()
}

// inProgressCount returns all requests that have been dequeued
// but have not yet completed.
func (q *FIFOQueue) InProgressCount() int64 {
return q.inProgressCount.Load()
}
8 changes: 4 additions & 4 deletions pkg/queuemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type FIFOQueueManager struct {
queues map[string]*queue.FIFOQueue
}

// WaitCounts returns the number of pending or in-progress requests for each deployment name
func (m *FIFOQueueManager) WaitCounts() map[string]int {
// TotalCounts returns the number of pending or in-progress requests for each deployment name
func (m *FIFOQueueManager) TotalCounts() map[string]int64 {
m.mtx.Lock()
sizes := make(map[string]int, len(m.queues))
sizes := make(map[string]int64, len(m.queues))
for name, q := range m.queues {
sizes[name] = q.Size()
sizes[name] = q.TotalCount()
}
m.mtx.Unlock()
return sizes
Expand Down
109 changes: 90 additions & 19 deletions tests/system-test-kind.sh
Original file line number Diff line number Diff line change
@@ -1,40 +1,111 @@
#!/usr/bin/env bash

set -e
set -xe

kind create cluster --name=substratus-test
trap "kind delete cluster --name=substratus-test" EXIT
DELETE_CLUSTER=${DELETE_CLUSTER:-true}
# This is possible because of kind extraPortMappings
HOST=127.0.0.1
PORT=30080
BASE_URL="http://$HOST:$PORT/v1"


if kind get clusters | grep -q substratus-test; then
echo "Cluster substratus-tests already exists.. reusing it"
else
kind create cluster --config - << EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: substratus-test
nodes:
- role: control-plane
# port forward 80 on the host to 80 on this node
extraPortMappings:
- containerPort: 30080
hostPort: 30080
listenAddress: "127.0.0.1"
EOF
if [ "$DELETE_CLUSTER" = true ]; then
echo "Going to delete cluster substratus-test on exit"
trap "kind delete cluster --name=substratus-test" EXIT
fi
fi

if ! kubectl get deployment proxy-controller; then
skaffold run
fi

skaffold run

kubectl wait --for=condition=available --timeout=30s deployment/proxy-controller

kubectl port-forward svc/proxy-controller 8080:80 &

if ! helm repo list | grep -q substratusai; then
helm repo add substratusai https://substratusai.github.io/helm/
fi
helm repo update
helm upgrade --install stapi-minilm-l6-v2 substratusai/stapi -f - << EOF
model: all-MiniLM-L6-v2
replicaCount: 0
deploymentAnnotations:
lingo.substratus.ai/models: text-embedding-ada-002
EOF

# need to wait for a bit for the port-forward to be ready
sleep 5

replicas=$(kubectl get deployment backend -o jsonpath='{.spec.replicas}')
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
if [ "$replicas" -ne 0 ]; then
echo "Expected 0 replica before sending requests, got $replicas"
exit 1
fi

echo "Sending 60 requests to model named backend"
for i in {1..60}; do
curl -s -o /dev/null http://localhost:8080/delay/10 \
-H "Content-Type: application/json" \
-d '{
"text": "Your text string goes here",
"model": "backend"
}' &
done
SCRIPT_DIR=$(dirname "$0")
VENV_DIR=$SCRIPT_DIR/.venv

sleep 10
if [ ! -d "$VENV_DIR" ]; then
python3 -m venv "$VENV_DIR"
fi
source "$VENV_DIR/bin/activate"
pip3 install openai==1.2.3

replicas=$(kubectl get deployment backend -o jsonpath='{.spec.replicas}')
# Send 60 requests in parallel to stapi backend using openai python client and threading
python3 $SCRIPT_DIR/test_openai_embedding.py \
--requests 60 --timeout 300 --base-url "${BASE_URL}" \
--model text-embedding-ada-002

if [ "$replicas" -ne 1 ]; then
echo "Expected 1 replica after sending less than 100 requests, got $replicas"
# Ensure replicas has been scaled up to 1 after sending 60 requests
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
if [ "$replicas" -eq 1 ]; then
echo "Test passed: Expected 1 replica after sending requests 60 requests"
else
echo "Test failed: Expected 1 replica after sending requests 60 requests, got $replicas"
exit 1
fi

echo "Waiting for deployment to scale down back to 0 within 2 minutes"
for i in {1..15}; do
if [ "$i" -eq 15 ]; then
echo "Test failed: Expected 0 replica after not having requests for more than 1 minute, got $replicas"
exit 1
fi
replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
if [ "$replicas" -eq 0 ]; then
echo "Test passed: Expected 0 replica after not having requests for more than 1 minute"
break
fi
sleep 8
done

# Scale up again after scaling to 0 is broken right now
# requests=500
# echo "Send $requests requests in parallel to stapi backend using openai python client and threading"
# python3 $SCRIPT_DIR/test_openai_embedding.py \
# --requests $requests --timeout 600 --base-url "${BASE_URL}" \
# --model text-embedding-ada-002
#
# replicas=$(kubectl get deployment stapi-minilm-l6-v2 -o jsonpath='{.spec.replicas}')
# if [ "$replicas" -ge 2 ]; then
# echo "Test passed: Expected 2 or more replicas after sending more than $requests requests, got $replicas"
# else
# echo "Test failed: Expected 2 or more replicas after sending more than $requests requests, got $replicas"
# exit 1
# fi
Loading
Loading