Skip to content

Commit

Permalink
wip: passive health checks
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Feb 19, 2024
1 parent 37c474a commit 519bddf
Show file tree
Hide file tree
Showing 16 changed files with 547 additions and 92 deletions.
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ type Config struct {
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`
OpenPolicyAgentMaxRequestBodySize int64 `yaml:"open-policy-agent-max-request-body-size"`
OpenPolicyAgentMaxMemoryBodyParsing int64 `yaml:"open-policy-agent-max-memory-body-parsing"`

PassiveHealthCheck mapFlags `yaml:"passive-health-check"`
}

const (
Expand Down Expand Up @@ -571,6 +573,9 @@ func NewConfig() *Config {
flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use <module>.<symbol> to selectively enable module symbols, for example: package,base._G,base.print,json")
flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`)

// Passive Health Checks
flag.Var(&cfg.PassiveHealthCheck, "passive-health-check", "sets the parameters for passive health check feature")

cfg.flags = flag
return cfg
}
Expand Down Expand Up @@ -912,6 +917,8 @@ func (c *Config) ToOptions() skipper.Options {
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,
OpenPolicyAgentMaxRequestBodySize: c.OpenPolicyAgentMaxRequestBodySize,
OpenPolicyAgentMaxMemoryBodyParsing: c.OpenPolicyAgentMaxMemoryBodyParsing,

PassiveHealthCheck: c.PassiveHealthCheck.values,
}
for _, rcci := range c.CloneRoute {
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)
Expand Down
32 changes: 32 additions & 0 deletions docs/operation/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,38 @@ to get the results paginated or getting all of them at the same time.
curl localhost:9911/routes?offset=200&limit=100
```

## Passive health check (*experimental*)

Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called
Passive Health Check(PHC).

PHC works the following way: the entire uptime is divided in chunks of `period`, per every period Skipper calculates
the total amount of requests and amount of requests failed per every endpoint. While next period is going on,
the Skipper takes a look at previous period and if for the given endpoint:
+ the amount of requests in the previous period is more than `min-requests`
AND
+ the ratio of failed requests is more than `min-failed`
then Skipper will send reduced (the more `requests-discount` and failed requests ratio in previous period are, the stronger reduction is,
but it never reduces lower than `min-requests`) amount of requests compared to amount sent without PHC.

Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead.

To enable this feature, you need to provide `-passive-health-check` option having all forementioned parameters
(`period`, `min-requests`, `min-failed`, `requests-discount`) defined,
for instance: `-passive-health-check=period=1m,min-requests=10,min-failed=0.2,requests-discount=0.9`.

You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially.

However, Skipper will run without this feature, if no `-passive-health-check` is provided at all.

The parameters of `-passive-health-check` option are:
+ `period=<duration>` - the duration of stats reset period
+ `min-requests=<int>` - the minimum number of requests per minute per backend endpoint required to activate PHC for this endpoint
+ `min-failed=<float more than 0 and less than 1>` - the minimum ratio of failed requests for every endpoint to make PHC
reduce amount of requests to this endpoint
+ `requests-discount=<float more than 0 and less than 1>` - the discount multiplier which defines how huge is reduction of requests
sent to endpoints considered unhealthy

## Memory consumption

While Skipper is generally not memory bound, some features may require
Expand Down
13 changes: 10 additions & 3 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)

foo := route(rt, "/foo")
Expand Down Expand Up @@ -266,6 +267,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)
r := route(rt, "/")
if r != nil {
Expand All @@ -278,13 +280,14 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("-1m") -> <"http://10.0.0.1:8080">
`

endpointRegisty := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, endpointRegisty)
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 {
t.Fatal("failed to ignore negative duration")
}
if endpointRegisty.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
if endpointRegistry.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
t.Fatal("failed to ignore negative duration")
}
})
Expand All @@ -295,6 +298,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, update := createRouting(t, routes, endpointRegistry)
firstDetected := time.Now()

Expand Down Expand Up @@ -327,6 +331,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, update := createRouting(t, initialRoutes, endpointRegistry)
firstDetected := time.Now()

Expand Down Expand Up @@ -362,6 +367,7 @@ func TestPostProcessor(t *testing.T) {

const lastSeenTimeout = 2 * time.Second
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{LastSeenTimeout: lastSeenTimeout})
defer endpointRegistry.Close()
rt, update := createRouting(t, initialRoutes, endpointRegistry)
firstDetected := time.Now()

Expand Down Expand Up @@ -397,6 +403,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
routes := fmt.Sprintf(routesFmt, nows(t))
rt, update := createRouting(t, routes, endpointRegistry)
firstDetected := time.Now()
Expand Down
9 changes: 9 additions & 0 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with default algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit round-robin algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit consistentHash algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -117,6 +120,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit random algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -146,6 +150,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit powerOfRandomNChoices algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -260,6 +265,7 @@ func TestApply(t *testing.T) {
req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -293,6 +299,7 @@ func TestConsistentHashSearch(t *testing.T) {
apply := func(key string, endpoints []string) string {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand Down Expand Up @@ -349,6 +356,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
}
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})
Expand Down Expand Up @@ -429,6 +437,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
}
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
Expand Down
6 changes: 6 additions & 0 deletions metricsinit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func TestInitOrderAndDefault(t *testing.T) {
SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)},
EnableRatelimiters: true,
SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod,
PassiveHealthCheck: map[string]string{
"period": "1m",
"min-requests": "10",
"min-failed": "0.2",
"requests-discount": "0.9",
},
}

tornDown := make(chan struct{})
Expand Down
7 changes: 6 additions & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}, quit: make(chan struct{})}
return route, proxy, eps
}

Expand All @@ -103,6 +103,7 @@ func calculateFadeInDuration(t *testing.T, algorithmName string, endpointAges []
const precalculateRatio = 10

route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDuration)
defer proxy.Close()
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

t.Log("preemulation start", time.Now())
Expand All @@ -125,6 +126,7 @@ func testFadeInMonotony(
t.Run(name, func(t *testing.T) {
fadeInDuration := calculateFadeInDuration(t, algorithmName, endpointAges)
route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, fadeInDuration)
defer proxy.Close()

t.Log("test start", time.Now())
var stats []string
Expand Down Expand Up @@ -273,6 +275,7 @@ func testFadeInLoadBetweenOldAndNewEps(
}

route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
nReqs := map[string]int{}

Expand Down Expand Up @@ -330,6 +333,7 @@ func testSelectEndpointEndsWhenAllEndpointsAreFading(
// Initialize every endpoint with zero: every endpoint is new
endpointAges := make([]float64, nEndpoints)
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
applied := make(chan struct{})

go func() {
Expand Down Expand Up @@ -364,6 +368,7 @@ func benchmarkFadeIn(
) {
b.Run(name, func(b *testing.B) {
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
var wg sync.WaitGroup

// Emulate the load balancer loop, sending requests to it with random hash keys
Expand Down
34 changes: 34 additions & 0 deletions proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package proxy

import (
"math/rand"

"github.com/zalando/skipper/routing"
)

type healthyEndpoints struct {
rnd *rand.Rand
endpointRegistry *routing.EndpointRegistry
}

func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if h == nil {
return endpoints
}

p := h.rnd.Float64()

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
if p < e.Metrics.HealthCheckDropProbability() {
/* drop */
} else {
filtered = append(filtered, e)
}
}

if len(filtered) == 0 {
return endpoints
}
return filtered
}
Loading

0 comments on commit 519bddf

Please sign in to comment.