Skip to content

Commit

Permalink
Add comments and increase default queue size to 100 (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
samos123 authored Nov 5, 2023
1 parent 4350d67 commit cd7f90e
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 53 deletions.
21 changes: 10 additions & 11 deletions autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func NewAutoscaler() *Autoscaler {

// Autoscaler is responsible for making continuous adjustments to
// the scale of the backend. It is not responsible for scale-from-zero.
// Each deployment has its own autoscaler.
type Autoscaler struct {
Interval time.Duration
AverageCount int
Expand All @@ -27,27 +28,25 @@ type Autoscaler struct {
func (a *Autoscaler) Start() {
for range time.Tick(a.Interval) {
log.Println("Calculating scales for all")
for model, waitCount := range a.FIFO.WaitCounts() {
if model == "proxy-controller" {
// TODO: Remove this after selecting models based on labels/annotations.
continue
}
avg := a.getMovingAvgQueueSize(model)
// TODO: Might need to account for queue size being 100
// probably can simply divide the average by queue size
for deploymentName, waitCount := range a.FIFO.WaitCounts() {
avg := a.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
ceil := math.Ceil(flt)
log.Printf("Average for model: %s: %v (ceil: %v), current wait count: %v", model, flt, ceil, waitCount)
a.Scaler.SetDesiredScale(model, int32(ceil))
log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount)
a.Scaler.SetDesiredScale(deploymentName, int32(ceil))
}
}
}

func (r *Autoscaler) getMovingAvgQueueSize(model string) *movingAvg {
func (r *Autoscaler) getMovingAvgQueueSize(deploymentName string) *movingAvg {
r.movingAvgQueueSizeMtx.Lock()
a, ok := r.movingAvgQueueSize[model]
a, ok := r.movingAvgQueueSize[deploymentName]
if !ok {
a = newSimpleMovingAvg(make([]float64, r.AverageCount))
r.movingAvgQueueSize[model] = a
r.movingAvgQueueSize[deploymentName] = a
}
r.movingAvgQueueSizeMtx.Unlock()
return a
Expand Down
39 changes: 22 additions & 17 deletions deployment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ type DeploymentManager struct {
ScaleDownPeriod time.Duration

scalersMtx sync.Mutex
scalers map[string]*scaler

// scalers maps deployment names to scalers
scalers map[string]*scaler

modelToDeploymentMtx sync.RWMutex
modelToDeployment map[string]string

// modelToDeployment maps model names to deployment names. A single deployment
// can serve multiple models.
modelToDeployment map[string]string
}

func (r *DeploymentManager) SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -50,12 +55,12 @@ func (r *DeploymentManager) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *DeploymentManager) AtLeastOne(model string) {
r.getScaler(model).AtLeastOne()
func (r *DeploymentManager) AtLeastOne(deploymentName string) {
r.getScaler(deploymentName).AtLeastOne()
}

func (r *DeploymentManager) SetDesiredScale(model string, n int32) {
r.getScaler(model).SetDesiredScale(n)
func (r *DeploymentManager) SetDesiredScale(deploymentName string, n int32) {
r.getScaler(deploymentName).SetDesiredScale(n)
}

func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -83,8 +88,8 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, fmt.Errorf("get scale: %w", err)
}

model := req.Name
r.getScaler(model).UpdateState(
deploymentName := req.Name
r.getScaler(deploymentName).UpdateState(
scale.Spec.Replicas,
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0),
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3),
Expand All @@ -93,22 +98,22 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

func (r *DeploymentManager) getScaler(model string) *scaler {
func (r *DeploymentManager) getScaler(deploymentName string) *scaler {
r.scalersMtx.Lock()
b, ok := r.scalers[model]
b, ok := r.scalers[deploymentName]
if !ok {
b = newScaler(r.ScaleDownPeriod, r.scaleFunc(context.TODO(), model))
r.scalers[model] = b
b = newScaler(r.ScaleDownPeriod, r.scaleFunc(context.TODO(), deploymentName))
r.scalers[deploymentName] = b
}
r.scalersMtx.Unlock()
return b
}

func (r *DeploymentManager) scaleFunc(ctx context.Context, model string) func(int32) error {
func (r *DeploymentManager) scaleFunc(ctx context.Context, deploymentName string) func(int32) error {
return func(n int32) error {
log.Printf("Scaling model %q: %v", model, n)
log.Printf("Scaling model %q: %v", deploymentName, n)

req := types.NamespacedName{Namespace: r.Namespace, Name: model}
req := types.NamespacedName{Namespace: r.Namespace, Name: deploymentName}
var d appsv1.Deployment
if err := r.Get(ctx, req, &d); err != nil {
return fmt.Errorf("get: %w", err)
Expand All @@ -131,9 +136,9 @@ func (r *DeploymentManager) scaleFunc(ctx context.Context, model string) func(in
}
}

func (r *DeploymentManager) setModelMapping(model, deployment string) {
func (r *DeploymentManager) setModelMapping(modelName, deploymentName string) {
r.modelToDeploymentMtx.Lock()
r.modelToDeployment[model] = deployment
r.modelToDeployment[modelName] = deploymentName
r.modelToDeploymentMtx.Unlock()
}

Expand Down
16 changes: 12 additions & 4 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ kind create cluster
# Install STAPI
helm repo add substratusai https://substratusai.github.io/helm
helm repo update
helm install stapi-minilm-l6-v2 substratusai/stapi \
--set model=all-MiniLM-L6-v2 \
--set deploymentAnnotations.lingo-models=text-embedding-ada-002
helm upgrade --install stapi-minilm-l6-v2 substratusai/stapi -f - << EOF
model: all-mpnet-base-v2
replicaCount: 0
deploymentAnnotations:
lingo.substratus.ai/models: text-embedding-ada-002
EOF


# Deploy
skaffold run
skaffold dev

# In another terminal...
kubectl port-forward svc/proxy-controller 8080:80
Expand All @@ -25,4 +29,8 @@ curl http://localhost:8080/v1/embeddings \
"input": "Your text string goes here",
"model": "text-embedding-ada-002"
}'

# Install vLLM with facebook opt 125


```
5 changes: 4 additions & 1 deletion endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewEndpointsManager(mgr ctrl.Manager) (*EndpointsManager, error) {
type EndpointsManager struct {
client.Client

EndpointSizeCallback func(model string, size int)
EndpointSizeCallback func(deploymentName string, size int)

endpointsMtx sync.Mutex
endpoints map[string]*endpointGroup
Expand Down Expand Up @@ -78,6 +78,9 @@ func (r *EndpointsManager) Reconcile(ctx context.Context, req ctrl.Request) (ctr
r.getEndpoints(serviceName).setIPs(ips, port)

if priorLen != len(ips) {
// TODO: Currently Service name needs to match Deployment name, however
// this shouldn't be the case. We should be able to reference deployment
// replicas by something else.
r.EndpointSizeCallback(serviceName, len(ips))
}

Expand Down
13 changes: 3 additions & 10 deletions fifo_queue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package main

import (
"fmt"
"log"
"sync"
"sync/atomic"
)

type fifoQueue struct {
// queued is a channel of channels that are waiting to be processed.
queued chan chan struct{}
completed chan struct{}
waiting atomic.Int64
Expand All @@ -26,25 +26,18 @@ func newFifoQueue(size, queueCapacity int) *fifoQueue {

func (q *fifoQueue) start() {
log.Println("Starting new fifo queue")
var inProgress int
var inProgress int = 0
for {
fmt.Println("1")
fmt.Println("inProgress", inProgress, "getSize()", q.getSize())
log.Println("inProgress", inProgress, "getSize()", q.getSize())
if inProgress >= q.getSize() {
fmt.Println("2")
<-q.completed
fmt.Println("3")
inProgress--
fmt.Println("4")
continue
}
fmt.Println("5")
inProgress++

c := <-q.queued
fmt.Println("6")
close(c)
fmt.Println("7")
}
}

Expand Down
27 changes: 18 additions & 9 deletions fifo_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ func NewFIFOQueueManager(size int, totalCapacity int) *FIFOQueueManager {
return m
}

// FIFOQueueManager manages the queues for each deployment
type FIFOQueueManager struct {
size int
// The default size of each queue for each deployment replica
size int

// The default total capacity of the queue for deployment
totalCapacity int

mtx sync.Mutex
queues map[string]*fifoQueue
}

// WaitCounts returns the number of pending or in-progress requests for each deployment name
func (m *FIFOQueueManager) WaitCounts() map[string]int64 {
m.mtx.Lock()
sizes := make(map[string]int64, len(m.queues))
Expand All @@ -31,23 +36,27 @@ func (m *FIFOQueueManager) WaitCounts() map[string]int64 {
return sizes
}

func (m *FIFOQueueManager) Enqueue(model string) func() {
return m.getQueue(model).enqueue()
// Enqueue adds a request to the queue for the given deployment name.
func (m *FIFOQueueManager) Enqueue(deploymentName string) func() {
return m.getQueue(deploymentName).enqueue()
}

func (m *FIFOQueueManager) UpdateQueueSize(model string, replicas int) {
// UpdateQueueSize updates the queue size for the given model name
func (m *FIFOQueueManager) UpdateQueueSize(deploymentName string, replicas int) {
newSize := replicas * m.size
log.Printf("Updating queue size: model: %v, replicas: %v, newSize: %v", model, replicas, newSize)
m.getQueue(model).resize(newSize)
log.Printf("Updating queue size: deployment: %v, replicas: %v, newSize: %v", deploymentName, replicas, newSize)
m.getQueue(deploymentName).resize(newSize)
}

func (m *FIFOQueueManager) getQueue(model string) *fifoQueue {
// getQueue returns the queue for the given model name.
// if the model does not have a queue, a new queue is created.
func (m *FIFOQueueManager) getQueue(deploymentName string) *fifoQueue {
m.mtx.Lock()
q, ok := m.queues[model]
q, ok := m.queues[deploymentName]
if !ok {
q = newFifoQueue(m.size, m.totalCapacity)
go q.start()
m.queues[model] = q
m.queues[deploymentName] = q
}
m.mtx.Unlock()
return q
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func run() error {
return fmt.Errorf("starting manager: %w", err)
}

fifo := NewFIFOQueueManager(1, 1000)
fifo := NewFIFOQueueManager(100, 60000)

endpoints, err := NewEndpointsManager(mgr)
if err != nil {
Expand Down

0 comments on commit cd7f90e

Please sign in to comment.