From 475b6e87378c7b38752d23af1e175b49515dd638 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Sun, 22 Sep 2024 17:05:23 +0200 Subject: [PATCH] skipper: add server connection keepalive limits Clients may connect to a subset of Skipper fleet which leads to uneven request distribution and increased cpu usage. Autoscaling of Skipper fleet is not effective because clients stay connected to old instances while new instances are underutilized. This change adds ConnManager that tracks creation of new connections and closes connections when their age or number of requests served reaches configured limits. Signed-off-by: Alexander Yastrebov --- config/config.go | 6 ++ docs/operation/operation.md | 10 +++ net/connmanager.go | 95 ++++++++++++++++++++++++++ net/connmanager_test.go | 128 ++++++++++++++++++++++++++++++++++++ skipper.go | 19 +++++- 5 files changed, 255 insertions(+), 3 deletions(-) create mode 100644 net/connmanager.go create mode 100644 net/connmanager_test.go diff --git a/config/config.go b/config/config.go index 20799722e1..16b205fb7f 100644 --- a/config/config.go +++ b/config/config.go @@ -243,6 +243,8 @@ type Config struct { ReadHeaderTimeoutServer time.Duration `yaml:"read-header-timeout-server"` WriteTimeoutServer time.Duration `yaml:"write-timeout-server"` IdleTimeoutServer time.Duration `yaml:"idle-timeout-server"` + KeepaliveServer time.Duration `yaml:"keepalive-server"` + KeepaliveRequestsServer int `yaml:"keepalive-requests-server"` MaxHeaderBytes int `yaml:"max-header-bytes"` EnableConnMetricsServer bool `yaml:"enable-connection-metrics"` TimeoutBackend time.Duration `yaml:"timeout-backend"` @@ -544,6 +546,8 @@ func NewConfig() *Config { flag.DurationVar(&cfg.ReadHeaderTimeoutServer, "read-header-timeout-server", 60*time.Second, "set ReadHeaderTimeout for http server connections") flag.DurationVar(&cfg.WriteTimeoutServer, "write-timeout-server", 60*time.Second, "set WriteTimeout for http server connections") flag.DurationVar(&cfg.IdleTimeoutServer, "idle-timeout-server", 60*time.Second, "set IdleTimeout for http server connections") + flag.DurationVar(&cfg.KeepaliveServer, "keepalive-server", 0*time.Second, "sets maximum age for http server connections. The connection is closed after it existed for this duration. Default is 0 for unlimited.") + flag.IntVar(&cfg.KeepaliveRequestsServer, "keepalive-requests-server", 0, "sets maximum number of requests for http server connections. The connection is closed after serving this number of requests. Default is 0 for unlimited.") flag.IntVar(&cfg.MaxHeaderBytes, "max-header-bytes", http.DefaultMaxHeaderBytes, "set MaxHeaderBytes for http server connections") flag.BoolVar(&cfg.EnableConnMetricsServer, "enable-connection-metrics", false, "enables connection metrics for http server connections") flag.DurationVar(&cfg.TimeoutBackend, "timeout-backend", 60*time.Second, "sets the TCP client connection timeout for backend connections") @@ -879,6 +883,8 @@ func (c *Config) ToOptions() skipper.Options { ReadHeaderTimeoutServer: c.ReadHeaderTimeoutServer, WriteTimeoutServer: c.WriteTimeoutServer, IdleTimeoutServer: c.IdleTimeoutServer, + KeepaliveServer: c.KeepaliveServer, + KeepaliveRequestsServer: c.KeepaliveRequestsServer, MaxHeaderBytes: c.MaxHeaderBytes, EnableConnMetricsServer: c.EnableConnMetricsServer, TimeoutBackend: c.TimeoutBackend, diff --git a/docs/operation/operation.md b/docs/operation/operation.md index c7ee3e9c5a..142254ff7f 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -124,6 +124,16 @@ combinations of idle timeouts can lead to a few unexpected HTTP 502. -idle-timeout-server duration maximum idle connections per backend host (default 1m0s) +This configures maximum number of requests served by server connections: + + -keepalive-requests-server int + sets maximum number of requests for http server connections. The connection is closed after serving this number of requests. Default is 0 for unlimited. + +This configures maximum age for server connections: + + -keepalive-server duration + sets maximum age for http server connections. The connection is closed after it existed for this duration. Default is 0 for unlimited. + This will set MaxHeaderBytes in [http.Server](https://golang.org/pkg/net/http/#Server) to limit the size of the http header from your clients. diff --git a/net/connmanager.go b/net/connmanager.go new file mode 100644 index 0000000000..fb65f79bf9 --- /dev/null +++ b/net/connmanager.go @@ -0,0 +1,95 @@ +package net + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/zalando/skipper/metrics" +) + +// ConnManager tracks creation of HTTP server connections and +// closes connections when their age or number of requests served reaches configured limits. +// Use [ConnManager.Configure] method to setup ConnManager for an [http.Server]. +type ConnManager struct { + // Metrics is an optional metrics registry to count connection events. + Metrics metrics.Metrics + + // Keepalive is the duration after which server connection is closed. + Keepalive time.Duration + + // KeepaliveRequests is the number of requests after which server connection is closed. + KeepaliveRequests int + + handler http.Handler +} + +type connState struct { + expiresAt time.Time + requests int +} + +type contextKey struct{} + +var connection contextKey + +func (cm *ConnManager) Configure(server *http.Server) { + cm.handler = server.Handler + server.Handler = http.HandlerFunc(cm.serveHTTP) + + if cc := server.ConnContext; cc != nil { + server.ConnContext = func(ctx context.Context, c net.Conn) context.Context { + ctx = cc(ctx, c) + return cm.connContext(ctx, c) + } + } else { + server.ConnContext = cm.connContext + } + + if cs := server.ConnState; cs != nil { + server.ConnState = func(c net.Conn, state http.ConnState) { + cs(c, state) + cm.connState(c, state) + } + } else { + server.ConnState = cm.connState + } +} + +func (cm *ConnManager) serveHTTP(w http.ResponseWriter, r *http.Request) { + state, _ := r.Context().Value(connection).(*connState) + state.requests++ + + if cm.KeepaliveRequests > 0 && state.requests >= cm.KeepaliveRequests { + w.Header().Set("Connection", "close") + + cm.count("lb-conn-closed.keepalive-requests") + } + + if cm.Keepalive > 0 && time.Now().After(state.expiresAt) { + w.Header().Set("Connection", "close") + + cm.count("lb-conn-closed.keepalive") + } + + cm.handler.ServeHTTP(w, r) +} + +func (cm *ConnManager) connContext(ctx context.Context, _ net.Conn) context.Context { + state := &connState{ + expiresAt: time.Now().Add(cm.Keepalive), + } + return context.WithValue(ctx, connection, state) +} + +func (cm *ConnManager) connState(_ net.Conn, state http.ConnState) { + cm.count(fmt.Sprintf("lb-conn-%s", state)) +} + +func (cm *ConnManager) count(name string) { + if cm.Metrics != nil { + cm.Metrics.IncCounter(name) + } +} diff --git a/net/connmanager_test.go b/net/connmanager_test.go new file mode 100644 index 0000000000..0b1aa06624 --- /dev/null +++ b/net/connmanager_test.go @@ -0,0 +1,128 @@ +package net_test + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zalando/skipper/metrics/metricstest" + snet "github.com/zalando/skipper/net" +) + +func TestConnManager(t *testing.T) { + const ( + keepaliveRequests = 3 + keepalive = 100 * time.Millisecond + + testRequests = keepaliveRequests * 5 + ) + t.Run("does not close connection without limits", func(t *testing.T) { + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + m := &metricstest.MockMetrics{} + cm := &snet.ConnManager{ + Metrics: m, + } + cm.Configure(ts.Config) + + ts.Start() + defer ts.Close() + + for i := 0; i < testRequests; i++ { + resp, err := ts.Client().Get(ts.URL) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.False(t, resp.Close) + } + + time.Sleep(100 * time.Millisecond) // wait for connection state update + + m.WithCounters(func(counters map[string]int64) { + assert.Equal(t, int64(1), counters["lb-conn-new"]) + assert.Equal(t, int64(testRequests), counters["lb-conn-active"]) + assert.Equal(t, int64(testRequests), counters["lb-conn-idle"]) + assert.Equal(t, int64(0), counters["lb-conn-closed"]) + }) + }) + t.Run("closes connection after keepalive requests", func(t *testing.T) { + const keepaliveRequests = 3 + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + m := &metricstest.MockMetrics{} + cm := &snet.ConnManager{ + Metrics: m, + KeepaliveRequests: keepaliveRequests, + } + cm.Configure(ts.Config) + + ts.Start() + defer ts.Close() + + for i := 1; i < testRequests; i++ { + resp, err := ts.Client().Get(ts.URL) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + if i%keepaliveRequests == 0 { + assert.True(t, resp.Close) + } else { + assert.False(t, resp.Close) + } + } + + time.Sleep(100 * time.Millisecond) // wait for connection state update + + m.WithCounters(func(counters map[string]int64) { + rounds := int64(testRequests / keepaliveRequests) + + assert.Equal(t, rounds, counters["lb-conn-new"]) + assert.Equal(t, rounds-1, counters["lb-conn-closed"]) + assert.Equal(t, rounds-1, counters["lb-conn-closed.keepalive-requests"]) + }) + }) + + t.Run("closes connection after keepalive timeout", func(t *testing.T) { + const keepalive = 100 * time.Millisecond + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + m := &metricstest.MockMetrics{} + cm := &snet.ConnManager{ + Metrics: m, + Keepalive: keepalive, + } + cm.Configure(ts.Config) + + ts.Start() + defer ts.Close() + + for i := 0; i < testRequests; i++ { + resp, err := ts.Client().Get(ts.URL) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.False(t, resp.Close) + } + + time.Sleep(2 * keepalive) + + resp, err := ts.Client().Get(ts.URL) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.True(t, resp.Close) + + time.Sleep(100 * time.Millisecond) // wait for connection state update + + m.WithCounters(func(counters map[string]int64) { + assert.Equal(t, int64(1), counters["lb-conn-new"]) + assert.Equal(t, int64(1), counters["lb-conn-closed"]) + assert.Equal(t, int64(1), counters["lb-conn-closed.keepalive"]) + }) + }) +} diff --git a/skipper.go b/skipper.go index 83a12a673a..bce6097b9d 100644 --- a/skipper.go +++ b/skipper.go @@ -360,6 +360,14 @@ type Options struct { // Defines IdleTimeout for server http connections. IdleTimeoutServer time.Duration + // KeepaliveServer configures maximum age for server http connections. + // The connection is closed after it existed for this duration. + KeepaliveServer time.Duration + + // KeepaliveRequestsServer configures maximum number of requests for server http connections. + // The connection is closed after serving this number of requests. + KeepaliveRequestsServer int + // Defines MaxHeaderBytes for server http connections. MaxHeaderBytes int @@ -1334,12 +1342,17 @@ func listenAndServeQuit( ErrorLog: newServerErrorLog(), } + cm := &skpnet.ConnManager{ + Keepalive: o.KeepaliveServer, + KeepaliveRequests: o.KeepaliveRequestsServer, + } + if o.EnableConnMetricsServer { - srv.ConnState = func(conn net.Conn, state http.ConnState) { - mtr.IncCounter(fmt.Sprintf("lb-conn-%s", state)) - } + cm.Metrics = mtr } + cm.Configure(srv) + log.Infof("Listen on %v", address) l, err := listen(o, address, mtr)