Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue cancellation on request cancellation #19

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"sync"
"time"

"github.com/substratusai/lingo/pkg/queuemanager"
)

func NewAutoscaler() *Autoscaler {
Expand All @@ -19,7 +21,9 @@ type Autoscaler struct {
AverageCount int

Scaler *DeploymentManager
FIFO *FIFOQueueManager
FIFO *queuemanager.FIFOQueueManager

ConcurrencyPerReplica int

movingAvgQueueSizeMtx sync.Mutex
movingAvgQueueSize map[string]*movingAvg
Expand All @@ -32,12 +36,7 @@ func (a *Autoscaler) Start() {
avg := a.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
// TODO fix this to use configurable concurrency setting that's supplied
// by the user.
// Note this uses the default queue size, not the current queue size.
// the current queue size increases and decreases based on replica count
concurrencyPerReplica := a.FIFO.size
normalized := flt / float64(concurrencyPerReplica)
normalized := flt / float64(a.ConcurrencyPerReplica)
ceil := math.Ceil(normalized)
log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount)
a.Scaler.SetDesiredScale(deploymentName, int32(ceil))
Expand Down
65 changes: 0 additions & 65 deletions fifo_queue.go

This file was deleted.

63 changes: 0 additions & 63 deletions fifo_queue_manager.go

This file was deleted.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module golang-proxy
module github.com/substratusai/lingo

go 1.21.0

Expand Down Expand Up @@ -29,7 +29,7 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
10 changes: 7 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ import (
"net/http"
"net/http/httputil"
"net/url"

"github.com/google/uuid"
"github.com/substratusai/lingo/pkg/queuemanager"
)

// Handler serves http requests.
// It is also responsible for triggering scale-from-zero.
type Handler struct {
Deployments *DeploymentManager
Endpoints *EndpointsManager
FIFO *FIFOQueueManager
FIFO *queuemanager.FIFOQueueManager
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
id := uuid.New().String()
log.Printf("request: %v", r.URL)

w.Header().Set("X-Proxy", "lingo")
Expand All @@ -45,9 +49,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Deployments.AtLeastOne(deploy)

log.Println("Entering queue")
unqueue := h.FIFO.Enqueue(deploy)
complete := h.FIFO.EnqueueAndWait(r.Context(), deploy, id)
log.Println("Admitted into queue")
defer unqueue()
defer complete()

log.Println("Waiting for IPs")
host := h.Endpoints.GetHost(r.Context(), deploy)
Expand Down
8 changes: 5 additions & 3 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,17 @@ func TestIntegration(t *testing.T) {
completeRequests(backendComplete, 1)

// Ensure the deployment scaled scaled past 1.
sendRequests(t, &wg, modelName, 2)
// 1/3 should be admitted
// 2/3 should remain in queue --> replicas should equal 2
sendRequests(t, &wg, modelName, 3)
requireDeploymentReplicas(t, deploy, 2)

// Make sure deployment will not be scaled past default max (3).
sendRequests(t, &wg, modelName, 2)
requireDeploymentReplicas(t, deploy, 3)

// Have the mock backend respond to the remaining 4 requests.
completeRequests(backendComplete, 4)
completeRequests(backendComplete, 5)

// Ensure scale-down.
requireDeploymentReplicas(t, deploy, 0)
Expand All @@ -82,7 +84,7 @@ func requireDeploymentReplicas(t *testing.T, deploy *appsv1.Deployment, n int32)
err := testK8sClient.Get(testCtx, types.NamespacedName{Namespace: deploy.Namespace, Name: deploy.Name}, deploy)
assert.NoError(t, err, "getting the deployment")
assert.NotNil(t, deploy.Spec.Replicas, "scale-up should have occurred")
assert.Equal(t, *deploy.Spec.Replicas, n, "scale-up should have occurred")
assert.Equal(t, n, *deploy.Spec.Replicas, "scale-up should have occurred")
}, 3*time.Second, time.Second/2, "waiting for the deployment to be scaled up")
}

Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/substratusai/lingo/pkg/queuemanager"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -44,15 +45,13 @@ func run() error {
var enableLeaderElection bool
var probeAddr string
var concurrencyPerReplica int
var maxQueueSize int

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&concurrencyPerReplica, "concurrency", 100, "the number of simultaneous requests that can be processed by each replica")
flag.IntVar(&maxQueueSize, "max-queue-size", 60000, "the maximum size of the queue that holds requests")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -80,13 +79,13 @@ func run() error {
return fmt.Errorf("starting manager: %w", err)
}

fifo := NewFIFOQueueManager(concurrencyPerReplica, maxQueueSize)
fifo := queuemanager.NewFIFOQueueManager(concurrencyPerReplica)

endpoints, err := NewEndpointsManager(mgr)
if err != nil {
return fmt.Errorf("setting up endpoint manager: %w", err)
}
endpoints.EndpointSizeCallback = fifo.UpdateQueueSize
endpoints.EndpointSizeCallback = fifo.UpdateQueueSizeForReplicas

scaler, err := NewDeploymentManager(mgr)
if err != nil {
Expand All @@ -99,6 +98,7 @@ func run() error {
autoscaler.Interval = 3 * time.Second
autoscaler.AverageCount = 10 // 10 * 3 seconds = 30 sec avg
autoscaler.Scaler = scaler
autoscaler.ConcurrencyPerReplica = concurrencyPerReplica
autoscaler.FIFO = fifo
go autoscaler.Start()

Expand Down
7 changes: 5 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/substratusai/lingo/pkg/queuemanager"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -63,11 +64,12 @@ func TestMain(m *testing.M) {
})
requireNoError(err)

fifo := NewFIFOQueueManager(1, 1000)
const concurrencyPerReplica = 1
fifo := queuemanager.NewFIFOQueueManager(concurrencyPerReplica)

endpoints, err := NewEndpointsManager(mgr)
requireNoError(err)
endpoints.EndpointSizeCallback = fifo.UpdateQueueSize
endpoints.EndpointSizeCallback = fifo.UpdateQueueSizeForReplicas

scaler, err := NewDeploymentManager(mgr)
requireNoError(err)
Expand All @@ -79,6 +81,7 @@ func TestMain(m *testing.M) {
autoscaler.AverageCount = 1 // 10 * 3 seconds = 30 sec avg
autoscaler.Scaler = scaler
autoscaler.FIFO = fifo
autoscaler.ConcurrencyPerReplica = concurrencyPerReplica
go autoscaler.Start()

handler := &Handler{
Expand Down
Loading
Loading