Skip to content

Commit

Permalink
wip: passive health checks
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Jan 26, 2024
1 parent 457d83b commit a7f8f3e
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 11 deletions.
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ type Config struct {
OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"`
OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"`
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`

// Passive Health Checks
EnablePassiveHealthChecks bool `yaml:"enable-passive-health-checks"`
PHCTotalRequestsThreshold float64 `yaml:"phc-total-requests-threshold"`
PHCTimeoutedRequestsRatioThreshold float64 `yaml:"phc-timeouted-requests-ratio-threshold"`
}

const (
Expand Down Expand Up @@ -567,6 +572,11 @@ func NewConfig() *Config {
flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use <module>.<symbol> to selectively enable module symbols, for example: package,base._G,base.print,json")
flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`)

// Passive Health Checks
flag.BoolVar(&cfg.EnablePassiveHealthChecks, "enable-passive-health-check", false, "enables passive health check")
flag.Float64Var(&cfg.PHCTotalRequestsThreshold, "phc-total-requests-threshold", 10.0, "sets the total requests threshold for passive health check")
flag.Float64Var(&cfg.PHCTimeoutedRequestsRatioThreshold, "phc-timeouted-requests-ratio-threshold", 0.1, "sets the timeouted requests ratio threshold for passive health check")

cfg.flags = flag
return cfg
}
Expand Down Expand Up @@ -906,6 +916,11 @@ func (c *Config) ToOptions() skipper.Options {
OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata,
OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval,
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,

// Passive Health Checks
PHCEnabled: c.EnablePassiveHealthChecks,
PHCTotalRequestsThreshold: c.PHCTotalRequestsThreshold,
PHCTimeoutedRequestsRatioThreshold: c.PHCTimeoutedRequestsRatioThreshold,
}
for _, rcci := range c.CloneRoute {
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)
Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ func defaultConfig() *Config {
LuaSources: commaListFlag(),
OpenPolicyAgentCleanerInterval: 10 * time.Second,
OpenPolicyAgentStartupTimeout: 30 * time.Second,
EnablePassiveHealthChecks: false,
PHCTotalRequestsThreshold: 10,
PHCTimeoutedRequestsRatioThreshold: 0.1,
}
}

Expand Down
6 changes: 5 additions & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
proxy := &Proxy{
registry: registry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry},
heathlyEndpoints: &healthyEndpoints{enabled: false},
}
return route, proxy, eps
}

Expand Down
35 changes: 35 additions & 0 deletions proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package proxy

import "github.com/zalando/skipper/routing"

type healthyEndpoints struct {
enabled bool
timeoutedRequestsRatioThreshold float64
totalRequestsThreshold float64
}

func (h *healthyEndpoints) healthyEndpoint(endpoint routing.LBEndpoint) bool {
timeoutedRequests := float64(endpoint.Metrics.TimeoutedRequests())
totalRequests := float64(endpoint.Metrics.TotalRequests())
failedRequestsRatio := timeoutedRequests / totalRequests

return failedRequestsRatio < h.timeoutedRequestsRatioThreshold || totalRequests < h.totalRequestsThreshold
}

func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if !h.enabled {
return endpoints
}

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
if h.healthyEndpoint(e) {
filtered = append(filtered, e)
}
}

if len(filtered) == 0 {
return endpoints
}
return filtered
}
104 changes: 104 additions & 0 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package proxy

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/loadbalancer"
"github.com/zalando/skipper/routing"
)

func initiliaze(hosts []string) (*routing.EndpointRegistry, *routing.Route) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: loadbalancer.PowerOfRandomNChoices.String()}
for i := range hosts {
eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, fmt.Sprintf("http://%s", hosts[i]))
}

route := &routing.Route{
Route: eskipRoute,
LBEndpoints: []routing.LBEndpoint{},
}
rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route})
route = rt[0]
endpointRegistry.Do([]*routing.Route{route})

return endpointRegistry, route
}

func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreHealthy(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 3, len(endpoints))
assert.Equal(t, hosts[0], endpoints[0].Host)
assert.Equal(t, hosts[1], endpoints[1].Host)
assert.Equal(t, hosts[2], endpoints[2].Host)
}

func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreUnhealthy(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
for j := 0; j < 30; j++ {
metrics.IncTimeoutedRequests()
}
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 3, len(endpoints))
assert.Equal(t, hosts[0], endpoints[0].Host)
assert.Equal(t, hosts[1], endpoints[1].Host)
assert.Equal(t, hosts[2], endpoints[2].Host)
}

func TestFilterHealthyEndpointsFiltersUnhealthyEndpoints(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
}
for i := 0; i < 30; i++ {
metrics := endpointRegistry.GetMetrics(hosts[0])
metrics.IncTimeoutedRequests()
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 2, len(endpoints))
assert.Equal(t, hosts[1], endpoints[0].Host)
assert.Equal(t, hosts[2], endpoints[1].Host)
}
52 changes: 49 additions & 3 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ const (
// DefaultExpectContinueTimeout, the default timeout to expect
// a response for a 100 Continue request
DefaultExpectContinueTimeout = 30 * time.Second

// DefaultHealthyEndpointsTotalRequestsThreshold, the default minimum amount of
// requests to the single endpoint for healthy endpoints checker to considering
// opting endpoint out
DefaultHealthyEndpointsTotalRequestsThreshold = 10

// DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold, the default minimum ratio of
// timeouted requests to total requests to the single endpoint for healthy endpoints
// checker to considering opting endpoint out
DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold = 0.1
)

// Flags control the behavior of the proxy.
Expand Down Expand Up @@ -250,6 +260,19 @@ type Params struct {
// and returns some metadata about endpoint. Information about the metadata
// returned from the registry could be found in routing.Metrics interface.
EndpointRegistry *routing.EndpointRegistry

// PCHEnabled enables the healthy endpoints checker
PHCEnabled bool

// HealthyEndpointsTotalRequestsThreshold defines the minimum number of total requests
// that should be sent to an endpoint to healthy endpoints checker potentially to
// opt out the endpoint from the list of healthy endpoints
HealthyEndpointsTotalRequestsThreshold float64

// HealthyEndpointsTimeoutedRequestsRatioThreshold defines the minimum ratio of timeouted
// requests to total requests that should be sent to an endpoint to healthy endpoints checker
// to opt out the endpoint from the list of healthy endpoints
HealthyEndpointsTimeoutedRequestsRatioThreshold float64
}

type (
Expand Down Expand Up @@ -330,6 +353,7 @@ type Proxy struct {
routing *routing.Routing
registry *routing.EndpointRegistry
fadein *fadeIn
heathlyEndpoints *healthyEndpoints
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -473,6 +497,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
rt := ctx.route
endpoints := rt.LBEndpoints
endpoints = p.fadein.filterFadeIn(endpoints, rt)
endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt)

lbctx := &routing.LBContext{
Request: ctx.request,
Expand Down Expand Up @@ -723,12 +748,28 @@ func WithParams(p Params) *Proxy {
p.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{})
}

if p.HealthyEndpointsTotalRequestsThreshold == 0 {
p.HealthyEndpointsTotalRequestsThreshold = DefaultHealthyEndpointsTotalRequestsThreshold
}

if p.HealthyEndpointsTimeoutedRequestsRatioThreshold == 0 {
p.HealthyEndpointsTimeoutedRequestsRatioThreshold = DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold
}

hostname := os.Getenv("HOSTNAME")

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry},
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{
rnd: rand.New(loadbalancer.NewLockedSource()),
endpointRegistry: p.EndpointRegistry,
},
heathlyEndpoints: &healthyEndpoints{
enabled: p.PHCEnabled,
timeoutedRequestsRatioThreshold: p.HealthyEndpointsTimeoutedRequestsRatioThreshold,
totalRequestsThreshold: p.HealthyEndpointsTotalRequestsThreshold,
},
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand Down Expand Up @@ -857,6 +898,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
if endpointMetrics != nil {
endpointMetrics.IncInflightRequest()
defer endpointMetrics.DecInflightRequest()

endpointMetrics.IncTotalRequests()
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
Expand All @@ -868,6 +911,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co

roundTripper, err := p.getRoundTripper(ctx, req)
if err != nil {
if endpointMetrics != nil {
endpointMetrics.IncTimeoutedRequests()
}
return nil, &proxyError{err: fmt.Errorf("failed to get roundtripper: %w", err), code: http.StatusBadGateway}
}

Expand Down
Loading

0 comments on commit a7f8f3e

Please sign in to comment.