From c9abb70593bbf2c40b667fb375869d5b46924c76 Mon Sep 17 00:00:00 2001 From: Roman Zavodskikh Date: Thu, 25 Jan 2024 16:57:19 +0100 Subject: [PATCH] wip: passive health checks Signed-off-by: Roman Zavodskikh --- config/config.go | 9 +++ docs/operation/operation.md | 32 ++++++++ metricsinit_test.go | 6 ++ proxy/healthy_endpoints.go | 34 ++++++++ proxy/healthy_endpoints_test.go | 136 ++++++++++++++++++++++++++++++++ proxy/proxy.go | 107 ++++++++++++++++++++++++- routing/endpointregistry.go | 110 ++++++++++++++++++++++---- skipper.go | 17 +++- 8 files changed, 430 insertions(+), 21 deletions(-) create mode 100644 proxy/healthy_endpoints.go create mode 100644 proxy/healthy_endpoints_test.go diff --git a/config/config.go b/config/config.go index b05152b5b6..f87c218a59 100644 --- a/config/config.go +++ b/config/config.go @@ -283,6 +283,9 @@ type Config struct { OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` + + // Passive Health Checks + PassiveHealthCheckParams mapFlags `yaml:"passive-health-check-params"` } const ( @@ -567,6 +570,9 @@ func NewConfig() *Config { flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use . to selectively enable module symbols, for example: package,base._G,base.print,json") flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`) + // Passive Health Checks + flag.Var(&cfg.PassiveHealthCheckParams, "passive-health-check-params", "sets the parameters for passive health check feature") + cfg.flags = flag return cfg } @@ -906,6 +912,9 @@ func (c *Config) ToOptions() skipper.Options { OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, + + // Passive Health Checks + PassiveHealthCheckParams: c.PassiveHealthCheckParams.values, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/docs/operation/operation.md b/docs/operation/operation.md index cf327d2fb5..d746636817 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -893,6 +893,38 @@ to get the results paginated or getting all of them at the same time. curl localhost:9911/routes?offset=200&limit=100 ``` +## Passive health check + +Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called +Passive Health Check(PHC). + +PHC works the following way: the entire uptime is divided in chunks of `stats-reset-period`, per every period Skipper calculates +the total amount of requests and amount of requests for which HTTP round trip had failed per every endpoint. While next period is going on, +the Skipper takes a look at previous period and if for the given endpoint: ++ the amount of requests in the previous period is more than `total-requests-threshold` +AND ++ the share of requests with failed HTTP round trip is more than `failed-round-trips-ratio-threshold` +then Skipper will send reduced (multiplied by `unhealthy-endpoints-requests-discount`, but not less than `total-requests-threshold`) +amount of requests compared to amount sent without PHC. + +Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead. + +To enable this feature, you need to provide `-passive-health-check-params` having all forementioned parameters +(`stats-reset-period`, `total-requests-threshold`, `failed-round-trips-ratio-threshold`, `unhealthy-endpoints-requests-discount`) defined, +for instance: `-passive-health-check-params=stats-reset-period=1m,total-requests-threshold=10,failed-round-trips-ratio-threshold=0.2,unhealthy-endpoints-requests-discount=0.9`. + +You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially. + +However, Skipper will run without this feature, if no `-passive-health-check-params` is provided at all. + +The parameters of `-passive-health-check-params` are: ++ `stats-reset-period=` - the duration of stats reset period ++ `total-requests-threshold=` - the minimum number of requests per minute per backend endpoint required to activate PHC for this endpoint ++ `failed-round-trips-ratio-threshold=` - the minimum ratio of failed requests with failed round trip +for every endpoint to make PHC avoid sending requests to this endpoint ++ `unhealthy-endpoints-requests-discount=` - the discount multiplier which defines how huge is reduction of requests +sent to endpoints considered unhealthy + ## Memory consumption While Skipper is generally not memory bound, some features may require diff --git a/metricsinit_test.go b/metricsinit_test.go index d2a94e8223..a8b8783b90 100644 --- a/metricsinit_test.go +++ b/metricsinit_test.go @@ -53,6 +53,12 @@ func TestInitOrderAndDefault(t *testing.T) { SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)}, EnableRatelimiters: true, SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod, + PassiveHealthCheckParams: map[string]string{ + "stats-reset-period": "1m", + "total-requests-threshold": "10", + "failed-round-trips-ratio-threshold": "0.2", + "unhealthy-endpoints-requests-discount": "0.9", + }, } tornDown := make(chan struct{}) diff --git a/proxy/healthy_endpoints.go b/proxy/healthy_endpoints.go new file mode 100644 index 0000000000..b79a132769 --- /dev/null +++ b/proxy/healthy_endpoints.go @@ -0,0 +1,34 @@ +package proxy + +import ( + "math/rand" + + "github.com/zalando/skipper/routing" +) + +type healthyEndpoints struct { + rnd *rand.Rand + endpointRegistry *routing.EndpointRegistry +} + +func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint { + if h == nil { + return endpoints + } + + p := h.rnd.Float64() + + filtered := make([]routing.LBEndpoint, 0, len(endpoints)) + for _, e := range endpoints { + if p < e.Metrics.HealthCheckDropProbability() { + /* drop */ + } else { + filtered = append(filtered, e) + } + } + + if len(filtered) == 0 { + return endpoints + } + return filtered +} diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go new file mode 100644 index 0000000000..bb6fc45f03 --- /dev/null +++ b/proxy/healthy_endpoints_test.go @@ -0,0 +1,136 @@ +package proxy + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/loadbalancer" + "github.com/zalando/skipper/routing" +) + +func initialize(hosts []string) (*routing.EndpointRegistry, *routing.Route) { + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: true, + TotalRequestsThreshold: 10, + FailedRoundTripsRatioThreshold: 0.1, + UnhealthyEndpointsRequestsDiscount: 1.0, + }) + eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: loadbalancer.PowerOfRandomNChoices.String()} + for i := range hosts { + eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, fmt.Sprintf("http://%s", hosts[i])) + } + + route := &routing.Route{ + Route: eskipRoute, + LBEndpoints: []routing.LBEndpoint{}, + } + rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route}) + route = rt[0] + endpointRegistry.Do([]*routing.Route{route}) + + return endpointRegistry, route +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreHealthy(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + endpointRegistry.Do([]*routing.Route{route}) + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: endpointRegistry}, + } + + const nRequests = 10_000 + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + } + + assert.Equal(t, nRequests, nHosts[3]) + assert.Equal(t, 0, nHosts[2]) + assert.Equal(t, 0, nHosts[1]) +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreUnhealthy(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + for j := 0; j < 30; j++ { + metrics.IncTotalFailedRoundTrips() + } + } + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: endpointRegistry}, + } + + const nRequests = 10_000 + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + } + + assert.Equal(t, nRequests, nHosts[3]) + assert.Equal(t, 0, nHosts[2]) + assert.Equal(t, 0, nHosts[1]) +} + +func TestFilterHealthyEndpointsFiltersUnhealthyEndpoints(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + for i := 0; i < 30; i++ { + metrics := endpointRegistry.GetMetrics(hosts[0]) + metrics.IncTotalFailedRoundTrips() + } + endpointRegistry.Do([]*routing.Route{route}) + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: endpointRegistry}, + } + + const nRequests = 10_000 + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + + if len(endpoints) == 2 { + assert.Equal(t, hosts[1], endpoints[0].Host) + assert.Equal(t, hosts[2], endpoints[1].Host) + } + } + + assert.InDelta(t, 6*nRequests/10, nHosts[3], nRequests/10) + assert.InDelta(t, 4*nRequests/10, nHosts[2], nRequests/10) + assert.Equal(t, 0, nHosts[1]) +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 5503ca511e..ddd18b9be4 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -145,6 +145,82 @@ type OpenTracingParams struct { ExcludeTags []string } +type PassiveHealthCheckParams struct { + // The period of time after which the endpointregistry begin to calculate endpoints statistics + // for scratch + StatsResetPeriod time.Duration + + // The minimum number of total requests that should be sent to an endpoint to healthy endpoints checker + // potentially to opt out the endpoint from the list of healthy endpoints + TotalRequestsThreshold int64 + + // The minimum ratio of timed out requests to total requests that should be sent to an endpoint to healthy + // endpoints checker to opt out the endpoint from the list of healthy endpoints + FailedRoundTripsRatioThreshold float64 + + // The discount coefficient for the total requests to the unhealthy endpoints + UnhealthyEndpointsRequestsDiscount float64 +} + +func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheckParams, error) { + if len(o) == 0 { + return false, &PassiveHealthCheckParams{}, nil + } + + result := &PassiveHealthCheckParams{} + keysInitialized := make(map[string]struct{}) + + for key, value := range o { + switch key { + case "stats-reset-period": + statsResetPeriod, err := time.ParseDuration(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid statsResetPeriod value: %s", value) + } + if statsResetPeriod < 0 { + return false, nil, fmt.Errorf("passive health check: invalid statsResetPeriod value: %s", value) + } + result.StatsResetPeriod = statsResetPeriod + case "total-requests-threshold": + totalRequestsThreshold, err := strconv.Atoi(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value) + } + if totalRequestsThreshold < 0 { + return false, nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value) + } + result.TotalRequestsThreshold = int64(totalRequestsThreshold) + case "failed-round-trips-ratio-threshold": + failedRoundTripsRatioThreshold, err := strconv.ParseFloat(value, 64) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid failedRoundTripsRatioThreshold value: %s", value) + } + if failedRoundTripsRatioThreshold < 0 || failedRoundTripsRatioThreshold > 1 { + return false, nil, fmt.Errorf("passive health check: invalid failedRoundTripsRatioThreshold value: %s", value) + } + result.FailedRoundTripsRatioThreshold = failedRoundTripsRatioThreshold + case "unhealthy-endpoints-requests-discount": + unhealthyEndpointsRequestsDiscount, err := strconv.ParseFloat(value, 64) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid unhealthyEndpointsRequestsDiscount value: %s", value) + } + if unhealthyEndpointsRequestsDiscount <= 0 || unhealthyEndpointsRequestsDiscount >= 1 { + return false, nil, fmt.Errorf("passive health check: invalid unhealthyEndpointsRequestsDiscount value: %s", value) + } + result.UnhealthyEndpointsRequestsDiscount = unhealthyEndpointsRequestsDiscount + default: + return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + } + + keysInitialized[key] = struct{}{} + } + + if len(keysInitialized) != 4 { + return false, nil, fmt.Errorf("passive health check: missing required parameters") + } + return true, result, nil +} + // Proxy initialization options. type Params struct { // The proxy expects a routing instance that is used to match @@ -247,6 +323,12 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry + + // EnablePassiveHealthCheck enables the healthy endpoints checker + EnablePassiveHealthCheck bool + + // PassiveHealthCheckParams defines the parameters for the healthy endpoints checker. + PassiveHealthCheck *PassiveHealthCheckParams } type ( @@ -324,6 +406,7 @@ type Proxy struct { routing *routing.Routing registry *routing.EndpointRegistry fadein *fadeIn + heathlyEndpoints *healthyEndpoints roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -466,6 +549,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route endpoints := rt.LBEndpoints endpoints = p.fadein.filterFadeIn(endpoints, rt) + endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt) lbctx := &routing.LBContext{ Request: ctx.request, @@ -717,11 +801,22 @@ func WithParams(p Params) *Proxy { } hostname := os.Getenv("HOSTNAME") + healthyEndpoints := &healthyEndpoints{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + } + if !p.EnablePassiveHealthCheck { + healthyEndpoints = nil + } return &Proxy{ - routing: p.Routing, - registry: p.EndpointRegistry, - fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry}, + routing: p.Routing, + registry: p.EndpointRegistry, + fadein: &fadeIn{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + }, + heathlyEndpoints: healthyEndpoints, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, @@ -849,6 +944,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co if endpointMetrics != nil { endpointMetrics.IncInflightRequest() defer endpointMetrics.DecInflightRequest() + + endpointMetrics.IncTotalRequests() } if p.experimentalUpgrade && isUpgradeRequest(req) { @@ -891,6 +988,10 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co ctx.proxySpan.LogKV("http_roundtrip", EndEvent) if err != nil { + if endpointMetrics != nil { + endpointMetrics.IncTotalFailedRoundTrips() + } + if errors.Is(err, skpio.ErrBlocked) { p.tracing.setTag(ctx.proxySpan, BlockTag, true) p.tracing.setTag(ctx.proxySpan, HTTPStatusCodeTag, uint16(http.StatusBadRequest)) diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f7b94315b1..927b559407 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -22,61 +22,110 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + IncTotalRequests() + IncTotalFailedRoundTrips() + HealthCheckDropProbability() float64 } -type entry struct { +type content struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + + lastStatsReset atomic.Value // time.Time + totalRequests [2]atomic.Int64 + totalFailedRoundTrips [2]atomic.Int64 + healthCheckDropProbability atomic.Value // float64 +} + +type entry struct { + payload *content + curSlot atomic.Int64 } var _ Metrics = &entry{} func (e *entry) DetectedTime() time.Time { - return e.detected.Load().(time.Time) + return e.payload.detected.Load().(time.Time) } func (e *entry) LastSeen() time.Time { - return e.lastSeen.Load().(time.Time) + return e.payload.lastSeen.Load().(time.Time) +} + +func (e *entry) lastStatsResetTime() time.Time { + return e.payload.lastStatsReset.Load().(time.Time) } func (e *entry) InflightRequests() int64 { - return e.inflightRequests.Load() + return e.payload.inflightRequests.Load() } func (e *entry) IncInflightRequest() { - e.inflightRequests.Add(1) + e.payload.inflightRequests.Add(1) } func (e *entry) DecInflightRequest() { - e.inflightRequests.Add(-1) + e.payload.inflightRequests.Add(-1) } func (e *entry) SetDetected(detected time.Time) { - e.detected.Store(detected) + e.payload.detected.Store(detected) } func (e *entry) SetLastSeen(ts time.Time) { - e.lastSeen.Store(ts) + e.payload.lastSeen.Store(ts) +} + +func (e *entry) setLastStatsReset(ts time.Time) { + e.payload.lastStatsReset.Store(ts) +} + +func (e *entry) IncTotalRequests() { + curSlot := e.curSlot.Load() + e.payload.totalRequests[curSlot].Add(1) +} + +func (e *entry) IncTotalFailedRoundTrips() { + curSlot := e.curSlot.Load() + e.payload.totalFailedRoundTrips[curSlot].Add(1) +} + +func (e *entry) HealthCheckDropProbability() float64 { + return e.payload.healthCheckDropProbability.Load().(float64) } func newEntry() *entry { - result := &entry{} + result := &entry{payload: &content{}} + result.payload.healthCheckDropProbability.Store(0.0) result.SetDetected(time.Time{}) result.SetLastSeen(time.Time{}) + result.setLastStatsReset(time.Time{}) return result } type EndpointRegistry struct { - lastSeenTimeout time.Duration - now func() time.Time - data sync.Map // map[string]*entry + lastSeenTimeout time.Duration + passiveHealthCheckEnabled bool + statsResetPeriod time.Duration + totalRequestsThreshold int64 + failedRoundTripsRatioThreshold float64 + unhealthyEndpointsRequestsDiscount float64 + + now func() time.Time + data sync.Map // map[string]*entry } var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration + LastSeenTimeout time.Duration + PassiveHealthCheckEnabled bool + StatsResetPeriod time.Duration + TotalRequestsThreshold int64 + FailedRoundTripsRatioThreshold float64 + UnhealthyEndpointsRequestsDiscount float64 } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -103,12 +152,31 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { } removeOlder := now.Add(-r.lastSeenTimeout) + resetOlder := now.Add(-r.statsResetPeriod) r.data.Range(func(key, value any) bool { e := value.(*entry) if e.LastSeen().Before(removeOlder) { r.data.Delete(key) } + if r.passiveHealthCheckEnabled && e.lastStatsResetTime().Before(resetOlder) { + e.setLastStatsReset(now) + curSlot := e.curSlot.Load() + nextSlot := (curSlot + 1) % 2 + e.payload.totalFailedRoundTrips[nextSlot].Store(0) + e.payload.totalRequests[nextSlot].Store(0) + + failed := e.payload.totalFailedRoundTrips[curSlot].Load() + requests := e.payload.totalRequests[curSlot].Load() + failedRoundTripsRatio := float64(failed) / float64(requests) + if failedRoundTripsRatio > r.failedRoundTripsRatioThreshold && requests > r.totalRequestsThreshold { + e.payload.healthCheckDropProbability.Store(1 - r.unhealthyEndpointsRequestsDiscount*failedRoundTripsRatio) + } else { + e.payload.healthCheckDropProbability.Store(0.0) + } + e.curSlot.Store(nextSlot) + } + return true }) @@ -121,9 +189,14 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { } return &EndpointRegistry{ - data: sync.Map{}, - lastSeenTimeout: o.LastSeenTimeout, - now: time.Now, + data: sync.Map{}, + lastSeenTimeout: o.LastSeenTimeout, + passiveHealthCheckEnabled: o.PassiveHealthCheckEnabled, + statsResetPeriod: o.StatsResetPeriod, + totalRequestsThreshold: o.TotalRequestsThreshold, + failedRoundTripsRatioThreshold: o.FailedRoundTripsRatioThreshold, + unhealthyEndpointsRequestsDiscount: o.UnhealthyEndpointsRequestsDiscount, + now: time.Now, } } @@ -133,7 +206,10 @@ func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics { if !ok { e, _ = r.data.LoadOrStore(hostPort, newEntry()) } - return e.(*entry) + copyFrom := e.(*entry) + copy := &entry{payload: copyFrom.payload} + copy.curSlot.Store(copyFrom.curSlot.Load()) + return copy } func (r *EndpointRegistry) allMetrics() map[string]Metrics { diff --git a/skipper.go b/skipper.go index f409012a9b..167e915f83 100644 --- a/skipper.go +++ b/skipper.go @@ -915,6 +915,8 @@ type Options struct { OpenPolicyAgentEnvoyMetadata string OpenPolicyAgentCleanerInterval time.Duration OpenPolicyAgentStartupTimeout time.Duration + + PassiveHealthCheckParams map[string]string } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1886,8 +1888,19 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { }) defer schedulerRegistry.Close() + passiveHealthCheckEnabled, passiveHealthCheckParams, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheckParams) + if err != nil { + return err + } + // create a routing engine - endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: passiveHealthCheckEnabled, + StatsResetPeriod: passiveHealthCheckParams.StatsResetPeriod, + TotalRequestsThreshold: passiveHealthCheckParams.TotalRequestsThreshold, + FailedRoundTripsRatioThreshold: passiveHealthCheckParams.FailedRoundTripsRatioThreshold, + UnhealthyEndpointsRequestsDiscount: passiveHealthCheckParams.UnhealthyEndpointsRequestsDiscount, + }) ro := routing.Options{ FilterRegistry: o.filterRegistry(), MatchingOptions: mo, @@ -1953,6 +1966,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { proxyParams := proxy.Params{ Routing: routing, EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: passiveHealthCheckEnabled, + PassiveHealthCheck: passiveHealthCheckParams, Flags: proxyFlags, PriorityRoutes: o.PriorityRoutes, IdleConnectionsPerHost: o.IdleConnectionsPerHost,