Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passive health checks #2888

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -285,6 +285,8 @@ type Config struct {
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`
OpenPolicyAgentMaxRequestBodySize int64 `yaml:"open-policy-agent-max-request-body-size"`
OpenPolicyAgentMaxMemoryBodyParsing int64 `yaml:"open-policy-agent-max-memory-body-parsing"`

PassiveHealthCheck mapFlags `yaml:"passive-health-check"`
}

const (
@@ -571,6 +573,9 @@ func NewConfig() *Config {
flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use <module>.<symbol> to selectively enable module symbols, for example: package,base._G,base.print,json")
flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`)

// Passive Health Checks
flag.Var(&cfg.PassiveHealthCheck, "passive-health-check", "sets the parameters for passive health check feature")

cfg.flags = flag
return cfg
}
@@ -912,6 +917,8 @@ func (c *Config) ToOptions() skipper.Options {
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,
OpenPolicyAgentMaxRequestBodySize: c.OpenPolicyAgentMaxRequestBodySize,
OpenPolicyAgentMaxMemoryBodyParsing: c.OpenPolicyAgentMaxMemoryBodyParsing,

PassiveHealthCheck: c.PassiveHealthCheck.values,
}
for _, rcci := range c.CloneRoute {
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)
28 changes: 28 additions & 0 deletions docs/operation/operation.md
Original file line number Diff line number Diff line change
@@ -893,6 +893,34 @@ to get the results paginated or getting all of them at the same time.
curl localhost:9911/routes?offset=200&limit=100
```

## Passive health check (*experimental*)

Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called
Passive Health Check(PHC).

PHC works the following way: the entire uptime is divided in chunks of `period`, per every period Skipper calculates
the total amount of requests and amount of requests failed per every endpoint. While next period is going on,
the Skipper takes a look at previous period and if the amount of requests in the previous period is more than `min-requests`
for the given endpoints then Skipper will send reduced (the more `max-drop-probability`
and failed requests ratio in previous period are, the stronger reduction is)
amount of requests compared to amount sent without PHC.

Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead.

To enable this feature, you need to provide `-passive-health-check` option having all forementioned parameters
(`period`, `min-requests`, `max-drop-probability`) defined,
for instance: `-passive-health-check=period=1s,min-requests=10,max-drop-probability=0.9`.

You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially.

However, Skipper will run without this feature, if no `-passive-health-check` is provided at all.

The parameters of `-passive-health-check` option are:
+ `period=<duration>` - the duration of stats reset period
+ `min-requests=<int>` - the minimum number of requests per `period` per backend endpoint required to activate PHC for this endpoint
+ `max-drop-probabilty=<float more than/equal to 0 and less than/equal to 1>` - the maximum possible probability of unhealthy endpoint being not considered
while choosing the endpoint for the given request

## Memory consumption

While Skipper is generally not memory bound, some features may require
13 changes: 10 additions & 3 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
@@ -236,6 +236,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)

foo := route(rt, "/foo")
@@ -266,6 +267,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)
r := route(rt, "/")
if r != nil {
@@ -278,13 +280,14 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("-1m") -> <"http://10.0.0.1:8080">
`

endpointRegisty := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, endpointRegisty)
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, _ := createRouting(t, routes, endpointRegistry)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 {
t.Fatal("failed to ignore negative duration")
}
if endpointRegisty.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
if endpointRegistry.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
t.Fatal("failed to ignore negative duration")
}
})
@@ -295,6 +298,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, update := createRouting(t, routes, endpointRegistry)
firstDetected := time.Now()

@@ -327,6 +331,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
rt, update := createRouting(t, initialRoutes, endpointRegistry)
firstDetected := time.Now()

@@ -362,6 +367,7 @@ func TestPostProcessor(t *testing.T) {

const lastSeenTimeout = 2 * time.Second
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{LastSeenTimeout: lastSeenTimeout})
defer endpointRegistry.Close()
rt, update := createRouting(t, initialRoutes, endpointRegistry)
firstDetected := time.Now()

@@ -397,6 +403,7 @@ func TestPostProcessor(t *testing.T) {
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
routes := fmt.Sprintf(routesFmt, nows(t))
rt, update := createRouting(t, routes, endpointRegistry)
firstDetected := time.Now()
9 changes: 9 additions & 0 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with default algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -59,6 +60,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit round-robin algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -88,6 +90,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit consistentHash algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -117,6 +120,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit random algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -146,6 +150,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Run("LB route with explicit powerOfRandomNChoices algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -260,6 +265,7 @@ func TestApply(t *testing.T) {
req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -293,6 +299,7 @@ func TestConsistentHashSearch(t *testing.T) {
apply := func(key string, endpoints []string) string {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
@@ -349,6 +356,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
}
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})
@@ -429,6 +437,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
}
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
defer endpointRegistry.Close()
endpointRegistry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
5 changes: 5 additions & 0 deletions metricsinit_test.go
Original file line number Diff line number Diff line change
@@ -53,6 +53,11 @@ func TestInitOrderAndDefault(t *testing.T) {
SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)},
EnableRatelimiters: true,
SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod,
PassiveHealthCheck: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "0.9",
},
}

tornDown := make(chan struct{})
7 changes: 6 additions & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}, quit: make(chan struct{})}
return route, proxy, eps
}

@@ -103,6 +103,7 @@ func calculateFadeInDuration(t *testing.T, algorithmName string, endpointAges []
const precalculateRatio = 10

route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDuration)
defer proxy.Close()
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

t.Log("preemulation start", time.Now())
@@ -125,6 +126,7 @@ func testFadeInMonotony(
t.Run(name, func(t *testing.T) {
fadeInDuration := calculateFadeInDuration(t, algorithmName, endpointAges)
route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, fadeInDuration)
defer proxy.Close()

t.Log("test start", time.Now())
var stats []string
@@ -273,6 +275,7 @@ func testFadeInLoadBetweenOldAndNewEps(
}

route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
nReqs := map[string]int{}

@@ -330,6 +333,7 @@ func testSelectEndpointEndsWhenAllEndpointsAreFading(
// Initialize every endpoint with zero: every endpoint is new
endpointAges := make([]float64, nEndpoints)
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
applied := make(chan struct{})

go func() {
@@ -364,6 +368,7 @@ func benchmarkFadeIn(
) {
b.Run(name, func(b *testing.B) {
route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge)
defer proxy.Close()
var wg sync.WaitGroup

// Emulate the load balancer loop, sending requests to it with random hash keys
34 changes: 34 additions & 0 deletions proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package proxy

import (
"math/rand"

"github.com/zalando/skipper/routing"
)

type healthyEndpoints struct {
rnd *rand.Rand
endpointRegistry *routing.EndpointRegistry
}

func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if h == nil {
return endpoints
}
MustafaSaber marked this conversation as resolved.
Show resolved Hide resolved

p := h.rnd.Float64()

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
if p < e.Metrics.HealthCheckDropProbability() {
/* drop */
} else {
filtered = append(filtered, e)
}
Comment on lines +23 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

if p >= e.Metrics.HealthCheckDropProbability() {
     filtered = append(filtered, e)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to be clear about the decision that if p < e.Metrics.HealthCheckDropProbability() holds true we want to skip the endpoint.

}

if len(filtered) == 0 {
return endpoints
RomanZavodskikh marked this conversation as resolved.
Show resolved Hide resolved
}
return filtered
}
Loading
Loading