diff --git a/config/config.go b/config/config.go index 63db7d2977..71b5c22035 100644 --- a/config/config.go +++ b/config/config.go @@ -283,6 +283,10 @@ type Config struct { OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` + + // Passive Health Checks + EnablePassiveHealthCheck bool `yaml:"enable-passive-health-check"` + PassiveHealthCheckParams mapFlags `yaml:"passive-health-check-params"` } const ( @@ -567,6 +571,14 @@ func NewConfig() *Config { flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use . to selectively enable module symbols, for example: package,base._G,base.print,json") flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`) + // Passive Health Checks + flag.BoolVar(&cfg.EnablePassiveHealthCheck, "enable-passive-health-check", false, "enables passive health check") + cfg.PassiveHealthCheckParams = mapFlags{values: map[string]string{ + "total-requests-threshold": "10", + "timed-out-requests-ratio-threshold": "0.1", + }} + flag.Var(&cfg.PassiveHealthCheckParams, "passive-health-check-params", "sets the parameters for passive health check feature") + cfg.flags = flag return cfg } @@ -906,6 +918,10 @@ func (c *Config) ToOptions() skipper.Options { OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, + + // Passive Health Checks + EnablePassiveHealthCheck: c.EnablePassiveHealthCheck, + PassiveHealthCheckParams: c.PassiveHealthCheckParams.values, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/config/config_test.go b/config/config_test.go index c875bc6565..d08440355f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -162,6 +162,10 @@ func defaultConfig() *Config { LuaSources: commaListFlag(), OpenPolicyAgentCleanerInterval: 10 * time.Second, OpenPolicyAgentStartupTimeout: 30 * time.Second, + PassiveHealthCheckParams: mapFlags{values: map[string]string{ + "total-requests-threshold": "10", + "timed-out-requests-ratio-threshold": "0.1", + }}, } } diff --git a/docs/operation/operation.md b/docs/operation/operation.md index cf327d2fb5..bcbdfe64ef 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -893,6 +893,19 @@ to get the results paginated or getting all of them at the same time. curl localhost:9911/routes?offset=200&limit=100 ``` +## Passive health check + +Skipper has an option to automatically detect and mitigate faulty backend enpoints, this feature is called +Passive Health Check(PHC). To enable this feature, you need to provide `-enable-passive-health-check` command line flag to Skipper. +Given only this flag, Skipper will try to provide PHC functionality with default parameters. +You can configure those parameters with `-passive-health-check-params` command line flag, +like this: `-passive-health-check-params=total-requests-threshold=10,timed-out-requests-ratio-threshold=0.1`. + +The possible sub-options for `-passive-health-check-params` are: ++ `total-requests-threshold=` - the minimum number of requests per minute per backend pod required to activate PHC for this pod ++ `timed-out-requests-ratio-threshold=` - the minimum ratio of failed requests for every backend pod to make PHC avoid +sending requests to this pod + ## Memory consumption While Skipper is generally not memory bound, some features may require diff --git a/metricsinit_test.go b/metricsinit_test.go index d2a94e8223..8deaa87799 100644 --- a/metricsinit_test.go +++ b/metricsinit_test.go @@ -53,6 +53,10 @@ func TestInitOrderAndDefault(t *testing.T) { SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)}, EnableRatelimiters: true, SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod, + PassiveHealthCheckParams: map[string]string{ + "total-requests-threshold": "10", + "timed-out-requests-ratio-threshold": "0.1", + }, } tornDown := make(chan struct{}) diff --git a/proxy/fadein_internal_test.go b/proxy/fadein_internal_test.go index 8b073b715a..93fafbbb5c 100644 --- a/proxy/fadein_internal_test.go +++ b/proxy/fadein_internal_test.go @@ -92,7 +92,11 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) } - proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}} + proxy := &Proxy{ + registry: registry, + fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}, + heathlyEndpoints: &healthyEndpoints{enabled: false}, + } return route, proxy, eps } diff --git a/proxy/healthy_endpoints.go b/proxy/healthy_endpoints.go new file mode 100644 index 0000000000..bc3cdb7551 --- /dev/null +++ b/proxy/healthy_endpoints.go @@ -0,0 +1,35 @@ +package proxy + +import "github.com/zalando/skipper/routing" + +type healthyEndpoints struct { + enabled bool + timeoutedRequestsRatioThreshold float64 + totalRequestsThreshold float64 +} + +func (h *healthyEndpoints) healthyEndpoint(endpoint routing.LBEndpoint) bool { + timeoutedRequests := float64(endpoint.Metrics.TimeoutedRequests()) + totalRequests := float64(endpoint.Metrics.TotalRequests()) + failedRequestsRatio := timeoutedRequests / totalRequests + + return failedRequestsRatio < h.timeoutedRequestsRatioThreshold || totalRequests < h.totalRequestsThreshold +} + +func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint { + if !h.enabled { + return endpoints + } + + filtered := make([]routing.LBEndpoint, 0, len(endpoints)) + for _, e := range endpoints { + if h.healthyEndpoint(e) { + filtered = append(filtered, e) + } + } + + if len(filtered) == 0 { + return endpoints + } + return filtered +} diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go new file mode 100644 index 0000000000..eb5ec30bdb --- /dev/null +++ b/proxy/healthy_endpoints_test.go @@ -0,0 +1,104 @@ +package proxy + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/loadbalancer" + "github.com/zalando/skipper/routing" +) + +func initiliaze(hosts []string) (*routing.EndpointRegistry, *routing.Route) { + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: loadbalancer.PowerOfRandomNChoices.String()} + for i := range hosts { + eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, fmt.Sprintf("http://%s", hosts[i])) + } + + route := &routing.Route{ + Route: eskipRoute, + LBEndpoints: []routing.LBEndpoint{}, + } + rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route}) + route = rt[0] + endpointRegistry.Do([]*routing.Route{route}) + + return endpointRegistry, route +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreHealthy(t *testing.T) { + hosts := []string{"host1:80", "host2:80", "host3:80"} + endpointRegistry, route := initiliaze(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10}, + } + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + + assert.Equal(t, 3, len(endpoints)) + assert.Equal(t, hosts[0], endpoints[0].Host) + assert.Equal(t, hosts[1], endpoints[1].Host) + assert.Equal(t, hosts[2], endpoints[2].Host) +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreUnhealthy(t *testing.T) { + hosts := []string{"host1:80", "host2:80", "host3:80"} + endpointRegistry, route := initiliaze(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + for j := 0; j < 30; j++ { + metrics.IncTimeoutedRequests() + } + } + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10}, + } + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + + assert.Equal(t, 3, len(endpoints)) + assert.Equal(t, hosts[0], endpoints[0].Host) + assert.Equal(t, hosts[1], endpoints[1].Host) + assert.Equal(t, hosts[2], endpoints[2].Host) +} + +func TestFilterHealthyEndpointsFiltersUnhealthyEndpoints(t *testing.T) { + hosts := []string{"host1:80", "host2:80", "host3:80"} + endpointRegistry, route := initiliaze(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + for i := 0; i < 30; i++ { + metrics := endpointRegistry.GetMetrics(hosts[0]) + metrics.IncTimeoutedRequests() + } + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10}, + } + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + + assert.Equal(t, 2, len(endpoints)) + assert.Equal(t, hosts[1], endpoints[0].Host) + assert.Equal(t, hosts[2], endpoints[1].Host) +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 9109364aa0..39524e6e42 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -67,6 +67,16 @@ const ( // DefaultExpectContinueTimeout, the default timeout to expect // a response for a 100 Continue request DefaultExpectContinueTimeout = 30 * time.Second + + // DefaultHealthyEndpointsTotalRequestsThreshold, the default minimum amount of + // requests to the single endpoint for healthy endpoints checker to considering + // opting endpoint out + DefaultHealthyEndpointsTotalRequestsThreshold = 10 + + // DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold, the default minimum ratio of + // timeouted requests to total requests to the single endpoint for healthy endpoints + // checker to considering opting endpoint out + DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold = 0.1 ) // Flags control the behavior of the proxy. @@ -145,6 +155,50 @@ type OpenTracingParams struct { ExcludeTags []string } +type PassiveHealthCheckParams struct { + // The minimum number of total requests that should be sent to an endpoint to healthy endpoints checker + // potentially to opt out the endpoint from the list of healthy endpoints + TotalRequestsThreshold int + + // The minimum ratio of timed out requests to total requests that should be sent to an endpoint to healthy + // endpoints checker to opt out the endpoint from the list of healthy endpoints + TimedOutRequestsRatioThreshold float64 +} + +func InitPassiveHealthChecker(o map[string]string) (*PassiveHealthCheckParams, error) { + result := &PassiveHealthCheckParams{ + TotalRequestsThreshold: DefaultHealthyEndpointsTotalRequestsThreshold, + TimedOutRequestsRatioThreshold: DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold, + } + + for key, value := range o { + switch key { + case "total-requests-threshold": + totalRequestsThreshold, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value) + } + if totalRequestsThreshold < 0 { + return nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value) + } + result.TotalRequestsThreshold = totalRequestsThreshold + case "timed-out-requests-ratio-threshold": + timedOutRequestsRatioThreshold, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("passive health check: invalid timedOutRequestsRatioThreshold value: %s", value) + } + if timedOutRequestsRatioThreshold < 0 || timedOutRequestsRatioThreshold > 1 { + return nil, fmt.Errorf("passive health check: invalid timedOutRequestsRatioThreshold value: %s", value) + } + result.TimedOutRequestsRatioThreshold = timedOutRequestsRatioThreshold + default: + return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + } + } + + return result, nil +} + // Proxy initialization options. type Params struct { // The proxy expects a routing instance that is used to match @@ -250,6 +304,12 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry + + // EnablePassiveHealthCheck enables the healthy endpoints checker + EnablePassiveHealthCheck bool + + // PassiveHealthCheckParams defines the parameters for the healthy endpoints checker. + PassiveHealthCheck *PassiveHealthCheckParams } type ( @@ -327,6 +387,7 @@ type Proxy struct { routing *routing.Routing registry *routing.EndpointRegistry fadein *fadeIn + heathlyEndpoints *healthyEndpoints roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -470,6 +531,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route endpoints := rt.LBEndpoints endpoints = p.fadein.filterFadeIn(endpoints, rt) + endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt) lbctx := &routing.LBContext{ Request: ctx.request, @@ -722,10 +784,26 @@ func WithParams(p Params) *Proxy { hostname := os.Getenv("HOSTNAME") + timeoutedRequestsRatioThreshold := DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold + totalRequestsThreshold := DefaultHealthyEndpointsTotalRequestsThreshold + if p.PassiveHealthCheck != nil { + timeoutedRequestsRatioThreshold = p.PassiveHealthCheck.TimedOutRequestsRatioThreshold + totalRequestsThreshold = p.PassiveHealthCheck.TotalRequestsThreshold + } + healthyEndpoints := &healthyEndpoints{ + enabled: p.EnablePassiveHealthCheck, + timeoutedRequestsRatioThreshold: timeoutedRequestsRatioThreshold, + totalRequestsThreshold: float64(totalRequestsThreshold), + } + return &Proxy{ - routing: p.Routing, - registry: p.EndpointRegistry, - fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry}, + routing: p.Routing, + registry: p.EndpointRegistry, + fadein: &fadeIn{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + }, + heathlyEndpoints: healthyEndpoints, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, @@ -854,6 +932,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co if endpointMetrics != nil { endpointMetrics.IncInflightRequest() defer endpointMetrics.DecInflightRequest() + + endpointMetrics.IncTotalRequests() } if p.experimentalUpgrade && isUpgradeRequest(req) { @@ -865,6 +945,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co roundTripper, err := p.getRoundTripper(ctx, req) if err != nil { + if endpointMetrics != nil { + endpointMetrics.IncTimeoutedRequests() + } return nil, &proxyError{err: fmt.Errorf("failed to get roundtripper: %w", err), code: http.StatusBadGateway} } diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f7b94315b1..a3885068af 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -8,7 +8,11 @@ import ( "github.com/zalando/skipper/eskip" ) -const defaultLastSeenTimeout = 1 * time.Minute +const ( + defaultLastSeenTimeout = 1 * time.Minute + defaultStatsResetPeriod = 1 * time.Minute + defaultDiscountFactor = 0.5 +) // Metrics describe the data about endpoint that could be // used to perform better load balancing, fadeIn, etc. @@ -22,12 +26,22 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + TotalRequests() int64 + IncTotalRequests() + + TimeoutedRequests() int64 + IncTimeoutedRequests() } type entry struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + + lastStatsReset atomic.Value // time.Time + totalRequests atomic.Int64 + totalTimeoutedRequests atomic.Int64 } var _ Metrics = &entry{} @@ -60,23 +74,44 @@ func (e *entry) SetLastSeen(ts time.Time) { e.lastSeen.Store(ts) } +func (e *entry) TotalRequests() int64 { + return e.totalRequests.Load() +} + +func (e *entry) IncTotalRequests() { + e.totalRequests.Add(1) +} + +func (e *entry) TimeoutedRequests() int64 { + return e.totalTimeoutedRequests.Load() +} + +func (e *entry) IncTimeoutedRequests() { + e.totalTimeoutedRequests.Add(1) +} + func newEntry() *entry { result := &entry{} result.SetDetected(time.Time{}) result.SetLastSeen(time.Time{}) + result.lastStatsReset.Store(time.Time{}) return result } type EndpointRegistry struct { - lastSeenTimeout time.Duration - now func() time.Time - data sync.Map // map[string]*entry + lastSeenTimeout time.Duration + statsResetPeriod time.Duration + discountFactor float64 + now func() time.Time + data sync.Map // map[string]*entry } var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration + LastSeenTimeout time.Duration + StatsResetPeriod time.Duration + DiscountFactor float64 } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -109,6 +144,12 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { r.data.Delete(key) } + if e.lastStatsReset.Load().(time.Time).Add(r.statsResetPeriod).Before(now) { + e.lastStatsReset.Store(now) + e.totalTimeoutedRequests.Store(int64(float64(e.totalTimeoutedRequests.Load()) * r.discountFactor)) + e.totalRequests.Store(int64(float64(e.totalRequests.Load()) * r.discountFactor)) + } + return true }) @@ -120,9 +161,18 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { o.LastSeenTimeout = defaultLastSeenTimeout } + if o.StatsResetPeriod == 0 { + o.StatsResetPeriod = defaultStatsResetPeriod + } + + if o.DiscountFactor == 0 { + o.DiscountFactor = defaultDiscountFactor + } + return &EndpointRegistry{ data: sync.Map{}, lastSeenTimeout: o.LastSeenTimeout, + discountFactor: o.DiscountFactor, now: time.Now, } } diff --git a/skipper.go b/skipper.go index 3a6dc0344c..7df4f0b986 100644 --- a/skipper.go +++ b/skipper.go @@ -917,6 +917,9 @@ type Options struct { OpenPolicyAgentEnvoyMetadata string OpenPolicyAgentCleanerInterval time.Duration OpenPolicyAgentStartupTimeout time.Duration + + EnablePassiveHealthCheck bool + PassiveHealthCheckParams map[string]string } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1959,10 +1962,17 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { routing := routing.New(ro) defer routing.Close() + passiveHealthCheckParams, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheckParams) + if err != nil { + return err + } + proxyFlags := proxy.Flags(o.ProxyOptions) | o.ProxyFlags proxyParams := proxy.Params{ Routing: routing, EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: o.EnablePassiveHealthCheck, + PassiveHealthCheck: passiveHealthCheckParams, Flags: proxyFlags, PriorityRoutes: o.PriorityRoutes, IdleConnectionsPerHost: o.IdleConnectionsPerHost,