From cd7f90e55f43551d51de4e487ca3b200331321fe Mon Sep 17 00:00:00 2001 From: Sam Stoelinga Date: Sat, 4 Nov 2023 18:56:54 -0700 Subject: [PATCH] Add comments and increase default queue size to 100 (#13) --- autoscaler.go | 21 ++++++++++----------- deployment_manager.go | 39 ++++++++++++++++++++++----------------- docs/development.md | 16 ++++++++++++---- endpoint_manager.go | 5 ++++- fifo_queue.go | 13 +++---------- fifo_queue_manager.go | 27 ++++++++++++++++++--------- main.go | 2 +- 7 files changed, 70 insertions(+), 53 deletions(-) diff --git a/autoscaler.go b/autoscaler.go index be5515cb..f414ec0b 100644 --- a/autoscaler.go +++ b/autoscaler.go @@ -13,6 +13,7 @@ func NewAutoscaler() *Autoscaler { // Autoscaler is responsible for making continuous adjustments to // the scale of the backend. It is not responsible for scale-from-zero. +// Each deployment has its own autoscaler. type Autoscaler struct { Interval time.Duration AverageCount int @@ -27,27 +28,25 @@ type Autoscaler struct { func (a *Autoscaler) Start() { for range time.Tick(a.Interval) { log.Println("Calculating scales for all") - for model, waitCount := range a.FIFO.WaitCounts() { - if model == "proxy-controller" { - // TODO: Remove this after selecting models based on labels/annotations. - continue - } - avg := a.getMovingAvgQueueSize(model) + // TODO: Might need to account for queue size being 100 + // probably can simply divide the average by queue size + for deploymentName, waitCount := range a.FIFO.WaitCounts() { + avg := a.getMovingAvgQueueSize(deploymentName) avg.Next(float64(waitCount)) flt := avg.Calculate() ceil := math.Ceil(flt) - log.Printf("Average for model: %s: %v (ceil: %v), current wait count: %v", model, flt, ceil, waitCount) - a.Scaler.SetDesiredScale(model, int32(ceil)) + log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount) + a.Scaler.SetDesiredScale(deploymentName, int32(ceil)) } } } -func (r *Autoscaler) getMovingAvgQueueSize(model string) *movingAvg { +func (r *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingAvg { r.movingAvgQueueSizeMtx.Lock() - a, ok := r.movingAvgQueueSize[model] + a, ok := r.movingAvgQueueSize[deploymentName] if !ok { a = newSimpleMovingAvg(make([]float64, r.AverageCount)) - r.movingAvgQueueSize[model] = a + r.movingAvgQueueSize[deploymentName] = a } r.movingAvgQueueSizeMtx.Unlock() return a diff --git a/deployment_manager.go b/deployment_manager.go index dabaa28f..739fce87 100644 --- a/deployment_manager.go +++ b/deployment_manager.go @@ -38,10 +38,15 @@ type DeploymentManager struct { ScaleDownPeriod time.Duration scalersMtx sync.Mutex - scalers map[string]*scaler + + // scalers maps deployment names to scalers + scalers map[string]*scaler modelToDeploymentMtx sync.RWMutex - modelToDeployment map[string]string + + // modelToDeployment maps model names to deployment names. A single deployment + // can serve multiple models. + modelToDeployment map[string]string } func (r *DeploymentManager) SetupWithManager(mgr ctrl.Manager) error { @@ -50,12 +55,12 @@ func (r *DeploymentManager) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *DeploymentManager) AtLeastOne(model string) { - r.getScaler(model).AtLeastOne() +func (r *DeploymentManager) AtLeastOne(deploymentName string) { + r.getScaler(deploymentName).AtLeastOne() } -func (r *DeploymentManager) SetDesiredScale(model string, n int32) { - r.getScaler(model).SetDesiredScale(n) +func (r *DeploymentManager) SetDesiredScale(deploymentName string, n int32) { + r.getScaler(deploymentName).SetDesiredScale(n) } func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -83,8 +88,8 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, fmt.Errorf("get scale: %w", err) } - model := req.Name - r.getScaler(model).UpdateState( + deploymentName := req.Name + r.getScaler(deploymentName).UpdateState( scale.Spec.Replicas, getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0), getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3), @@ -93,22 +98,22 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, nil } -func (r *DeploymentManager) getScaler(model string) *scaler { +func (r *DeploymentManager) getScaler(deploymentName string) *scaler { r.scalersMtx.Lock() - b, ok := r.scalers[model] + b, ok := r.scalers[deploymentName] if !ok { - b = newScaler(r.ScaleDownPeriod, r.scaleFunc(context.TODO(), model)) - r.scalers[model] = b + b = newScaler(r.ScaleDownPeriod, r.scaleFunc(context.TODO(), deploymentName)) + r.scalers[deploymentName] = b } r.scalersMtx.Unlock() return b } -func (r *DeploymentManager) scaleFunc(ctx context.Context, model string) func(int32) error { +func (r *DeploymentManager) scaleFunc(ctx context.Context, deploymentName string) func(int32) error { return func(n int32) error { - log.Printf("Scaling model %q: %v", model, n) + log.Printf("Scaling model %q: %v", deploymentName, n) - req := types.NamespacedName{Namespace: r.Namespace, Name: model} + req := types.NamespacedName{Namespace: r.Namespace, Name: deploymentName} var d appsv1.Deployment if err := r.Get(ctx, req, &d); err != nil { return fmt.Errorf("get: %w", err) @@ -131,9 +136,9 @@ func (r *DeploymentManager) scaleFunc(ctx context.Context, model string) func(in } } -func (r *DeploymentManager) setModelMapping(model, deployment string) { +func (r *DeploymentManager) setModelMapping(modelName, deploymentName string) { r.modelToDeploymentMtx.Lock() - r.modelToDeployment[model] = deployment + r.modelToDeployment[modelName] = deploymentName r.modelToDeploymentMtx.Unlock() } diff --git a/docs/development.md b/docs/development.md index 5abfd3fd..c2bcd31f 100644 --- a/docs/development.md +++ b/docs/development.md @@ -6,12 +6,16 @@ kind create cluster # Install STAPI helm repo add substratusai https://substratusai.github.io/helm helm repo update -helm install stapi-minilm-l6-v2 substratusai/stapi \ - --set model=all-MiniLM-L6-v2 \ - --set deploymentAnnotations.lingo-models=text-embedding-ada-002 +helm upgrade --install stapi-minilm-l6-v2 substratusai/stapi -f - << EOF +model: all-mpnet-base-v2 +replicaCount: 0 +deploymentAnnotations: + lingo.substratus.ai/models: text-embedding-ada-002 +EOF + # Deploy -skaffold run +skaffold dev # In another terminal... kubectl port-forward svc/proxy-controller 8080:80 @@ -25,4 +29,8 @@ curl http://localhost:8080/v1/embeddings \ "input": "Your text string goes here", "model": "text-embedding-ada-002" }' + +# Install vLLM with facebook opt 125 + + ``` diff --git a/endpoint_manager.go b/endpoint_manager.go index 410d2b88..40cc7844 100644 --- a/endpoint_manager.go +++ b/endpoint_manager.go @@ -25,7 +25,7 @@ func NewEndpointsManager(mgr ctrl.Manager) (*EndpointsManager, error) { type EndpointsManager struct { client.Client - EndpointSizeCallback func(model string, size int) + EndpointSizeCallback func(deploymentName string, size int) endpointsMtx sync.Mutex endpoints map[string]*endpointGroup @@ -78,6 +78,9 @@ func (r *EndpointsManager) Reconcile(ctx context.Context, req ctrl.Request) (ctr r.getEndpoints(serviceName).setIPs(ips, port) if priorLen != len(ips) { + // TODO: Currently Service name needs to match Deployment name, however + // this shouldn't be the case. We should be able to reference deployment + // replicas by something else. r.EndpointSizeCallback(serviceName, len(ips)) } diff --git a/fifo_queue.go b/fifo_queue.go index 04d31b49..a0986a47 100644 --- a/fifo_queue.go +++ b/fifo_queue.go @@ -1,13 +1,13 @@ package main import ( - "fmt" "log" "sync" "sync/atomic" ) type fifoQueue struct { + // queued is a channel of channels that are waiting to be processed. queued chan chan struct{} completed chan struct{} waiting atomic.Int64 @@ -26,25 +26,18 @@ func newFifoQueue(size, queueCapacity int) *fifoQueue { func (q *fifoQueue) start() { log.Println("Starting new fifo queue") - var inProgress int + var inProgress int = 0 for { - fmt.Println("1") - fmt.Println("inProgress", inProgress, "getSize()", q.getSize()) + log.Println("inProgress", inProgress, "getSize()", q.getSize()) if inProgress >= q.getSize() { - fmt.Println("2") <-q.completed - fmt.Println("3") inProgress-- - fmt.Println("4") continue } - fmt.Println("5") inProgress++ c := <-q.queued - fmt.Println("6") close(c) - fmt.Println("7") } } diff --git a/fifo_queue_manager.go b/fifo_queue_manager.go index 8d181372..848f46e2 100644 --- a/fifo_queue_manager.go +++ b/fifo_queue_manager.go @@ -13,14 +13,19 @@ func NewFIFOQueueManager(size int, totalCapacity int) *FIFOQueueManager { return m } +// FIFOQueueManager manages the queues for each deployment type FIFOQueueManager struct { - size int + // The default size of each queue for each deployment replica + size int + + // The default total capacity of the queue for deployment totalCapacity int mtx sync.Mutex queues map[string]*fifoQueue } +// WaitCounts returns the number of pending or in-progress requests for each deployment name func (m *FIFOQueueManager) WaitCounts() map[string]int64 { m.mtx.Lock() sizes := make(map[string]int64, len(m.queues)) @@ -31,23 +36,27 @@ func (m *FIFOQueueManager) WaitCounts() map[string]int64 { return sizes } -func (m *FIFOQueueManager) Enqueue(model string) func() { - return m.getQueue(model).enqueue() +// Enqueue adds a request to the queue for the given deployment name. +func (m *FIFOQueueManager) Enqueue(deploymentName string) func() { + return m.getQueue(deploymentName).enqueue() } -func (m *FIFOQueueManager) UpdateQueueSize(model string, replicas int) { +// UpdateQueueSize updates the queue size for the given model name +func (m *FIFOQueueManager) UpdateQueueSize(deploymentName string, replicas int) { newSize := replicas * m.size - log.Printf("Updating queue size: model: %v, replicas: %v, newSize: %v", model, replicas, newSize) - m.getQueue(model).resize(newSize) + log.Printf("Updating queue size: deployment: %v, replicas: %v, newSize: %v", deploymentName, replicas, newSize) + m.getQueue(deploymentName).resize(newSize) } -func (m *FIFOQueueManager) getQueue(model string) *fifoQueue { +// getQueue returns the queue for the given model name. +// if the model does not have a queue, a new queue is created. +func (m *FIFOQueueManager) getQueue(deploymentName string) *fifoQueue { m.mtx.Lock() - q, ok := m.queues[model] + q, ok := m.queues[deploymentName] if !ok { q = newFifoQueue(m.size, m.totalCapacity) go q.start() - m.queues[model] = q + m.queues[deploymentName] = q } m.mtx.Unlock() return q diff --git a/main.go b/main.go index fb5ea4dc..f1b31344 100644 --- a/main.go +++ b/main.go @@ -75,7 +75,7 @@ func run() error { return fmt.Errorf("starting manager: %w", err) } - fifo := NewFIFOQueueManager(1, 1000) + fifo := NewFIFOQueueManager(100, 60000) endpoints, err := NewEndpointsManager(mgr) if err != nil {