diff --git a/config/config.go b/config/config.go index b05152b5b6..23c657870c 100644 --- a/config/config.go +++ b/config/config.go @@ -283,6 +283,9 @@ type Config struct { OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"` OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"` OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"` + + // Passive Health Checks + PassiveHealthCheck mapFlags `yaml:"passive-health-check"` } const ( @@ -567,6 +570,9 @@ func NewConfig() *Config { flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use . to selectively enable module symbols, for example: package,base._G,base.print,json") flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`) + // Passive Health Checks + flag.Var(&cfg.PassiveHealthCheck, "passive-health-check", "sets the parameters for passive health check feature") + cfg.flags = flag return cfg } @@ -906,6 +912,8 @@ func (c *Config) ToOptions() skipper.Options { OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata, OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval, OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout, + + PassiveHealthCheck: c.PassiveHealthCheck.values, } for _, rcci := range c.CloneRoute { eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl) diff --git a/docs/operation/operation.md b/docs/operation/operation.md index cf327d2fb5..b4eaced469 100644 --- a/docs/operation/operation.md +++ b/docs/operation/operation.md @@ -893,6 +893,38 @@ to get the results paginated or getting all of them at the same time. curl localhost:9911/routes?offset=200&limit=100 ``` +## Passive health check (*experimental*) + +Skipper has an option to automatically detect and mitigate faulty backend endpoints, this feature is called +Passive Health Check(PHC). + +PHC works the following way: the entire uptime is divided in chunks of `period`, per every period Skipper calculates +the total amount of requests and amount of requests failed per every endpoint. While next period is going on, +the Skipper takes a look at previous period and if for the given endpoint: ++ the amount of requests in the previous period is more than `min-requests` +AND ++ the ratio of failed requests is more than `min-failed` +then Skipper will send reduced (the more `requests-discount` and failed requests ratio in previous period are, the stronger reduction is, +but it never reduces lower than `min-requests`) amount of requests compared to amount sent without PHC. + +Having this, we expect less requests to fail because a lot of them would be sent to endpoints that seem to be healthy instead. + +To enable this feature, you need to provide `-passive-health-check` option having all forementioned parameters +(`period`, `min-requests`, `min-failed`, `requests-discount`) defined, +for instance: `-passive-health-check=period=1m,min-requests=10,min-failed=0.2,requests-discount=0.9`. + +You need to define all parameters on your side, there are no defaults, and skipper will not run if PHC params are passed only partially. + +However, Skipper will run without this feature, if no `-passive-health-check` is provided at all. + +The parameters of `-passive-health-check` option are: ++ `period=` - the duration of stats reset period ++ `min-requests=` - the minimum number of requests per minute per backend endpoint required to activate PHC for this endpoint ++ `min-failed=` - the minimum ratio of failed requests for every endpoint to make PHC +reduce amount of requests to this endpoint ++ `requests-discount=` - the discount multiplier which defines how huge is reduction of requests +sent to endpoints considered unhealthy + ## Memory consumption While Skipper is generally not memory bound, some features may require diff --git a/metricsinit_test.go b/metricsinit_test.go index d2a94e8223..2a78841142 100644 --- a/metricsinit_test.go +++ b/metricsinit_test.go @@ -53,6 +53,12 @@ func TestInitOrderAndDefault(t *testing.T) { SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)}, EnableRatelimiters: true, SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod, + PassiveHealthCheck: map[string]string{ + "period": "1m", + "min-requests": "10", + "min-failed": "0.2", + "requests-discount": "0.9", + }, } tornDown := make(chan struct{}) diff --git a/proxy/healthy_endpoints.go b/proxy/healthy_endpoints.go new file mode 100644 index 0000000000..b79a132769 --- /dev/null +++ b/proxy/healthy_endpoints.go @@ -0,0 +1,34 @@ +package proxy + +import ( + "math/rand" + + "github.com/zalando/skipper/routing" +) + +type healthyEndpoints struct { + rnd *rand.Rand + endpointRegistry *routing.EndpointRegistry +} + +func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint { + if h == nil { + return endpoints + } + + p := h.rnd.Float64() + + filtered := make([]routing.LBEndpoint, 0, len(endpoints)) + for _, e := range endpoints { + if p < e.Metrics.HealthCheckDropProbability() { + /* drop */ + } else { + filtered = append(filtered, e) + } + } + + if len(filtered) == 0 { + return endpoints + } + return filtered +} diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go index 3e5d0b7be9..3a5364df62 100644 --- a/proxy/healthy_endpoints_test.go +++ b/proxy/healthy_endpoints_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/loadbalancer" "github.com/zalando/skipper/routing" ) @@ -20,7 +21,123 @@ const ( ) func defaultEndpointRegistry() *routing.EndpointRegistry { - return routing.NewEndpointRegistry(routing.RegistryOptions{}) + return routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: true, + StatsResetPeriod: period, + MinRequests: 10, + MinFailed: 0.1, + RequestsDiscount: 1.0, + }) +} + +func initialize(hosts []string) (*routing.EndpointRegistry, *routing.Route, *Proxy) { + endpointRegistry := defaultEndpointRegistry() + eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: loadbalancer.PowerOfRandomNChoices.String()} + for i := range hosts { + eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, fmt.Sprintf("http://%s", hosts[i])) + } + + route := &routing.Route{ + Route: eskipRoute, + LBEndpoints: []routing.LBEndpoint{}, + } + rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route}) + route = rt[0] + endpointRegistry.Do([]*routing.Route{route}) + + proxy := &Proxy{ + registry: endpointRegistry, + heathlyEndpoints: &healthyEndpoints{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: endpointRegistry}, + } + + return endpointRegistry, route, proxy +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreHealthy(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route, proxy := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + time.Sleep(period + time.Millisecond) + endpointRegistry.Do([]*routing.Route{route}) + + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + } + + assert.Equal(t, nRequests, nHosts[3]) + assert.Equal(t, 0, nHosts[2]) + assert.Equal(t, 0, nHosts[1]) +} + +func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreUnhealthy(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route, proxy := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + for j := 0; j < 30; j++ { + metrics.IncTotalFailedRoundTrips() + } + } + time.Sleep(period + time.Millisecond) + endpointRegistry.Do([]*routing.Route{route}) + + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + } + + assert.Equal(t, nRequests, nHosts[3]) + assert.Equal(t, 0, nHosts[2]) + assert.Equal(t, 0, nHosts[1]) +} + +func TestFilterHealthyEndpointsFiltersUnhealthyEndpoints(t *testing.T) { + hosts := []string{"host1.test:80", "host2.test:80", "host3.test:80"} + endpointRegistry, route, proxy := initialize(hosts) + + for i := range hosts { + metrics := endpointRegistry.GetMetrics(hosts[i]) + for j := 0; j < 50; j++ { + metrics.IncTotalRequests() + } + } + for i := 0; i < 30; i++ { + metrics := endpointRegistry.GetMetrics(hosts[0]) + metrics.IncTotalFailedRoundTrips() + } + time.Sleep(period + time.Millisecond) + endpointRegistry.Do([]*routing.Route{route}) + + nHosts := map[int]int{} + for i := 0; i < nRequests; i++ { + endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route) + v := nHosts[len(endpoints)] + nHosts[len(endpoints)] = v + 1 + + if len(endpoints) == 2 { + assert.Equal(t, hosts[1], endpoints[0].Host) + assert.Equal(t, hosts[2], endpoints[1].Host) + } + } + + assert.InDelta(t, 4*nRequests/10, nHosts[3], nRequests/10) + assert.InDelta(t, 6*nRequests/10, nHosts[2], nRequests/10) + assert.Equal(t, 0, nHosts[1]) } func TestPHCForSingleHealthyEndpoint(t *testing.T) { @@ -32,7 +149,8 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) { doc := fmt.Sprintf(`* -> "%s"`, service.URL) tp, err := newTestProxyWithParams(doc, Params{ - EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, }) if err != nil { t.Fatal(err) @@ -91,7 +209,8 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) { doc := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL, services[2].URL) tp, err := newTestProxyWithParams(doc, Params{ - EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: true, + EndpointRegistry: endpointRegistry, }) if err != nil { t.Fatal(err) @@ -177,6 +296,7 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) { doc := fmt.Sprintf(`* -> `, services[0].URL, services[1].URL, services[2].URL) tp, err := newTestProxyWithParams(doc, Params{ + EnablePassiveHealthCheck: true, EndpointRegistry: endpointRegistry, CustomHttpRoundTripperWrap: newRoundTripperUnhealthyHost(&RoundTripperUnhealthyHostParams{Host: services[0].URL[7:], Probability: rtFailureProbability}), }) @@ -221,7 +341,5 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) { } rsp.Body.Close() } - assert.InDelta(t, 0.33*rtFailureProbability*nRequests, failedReqs, 0.05*nRequests) - // After PHC is implemented, I expect failed requests to decrease like this: - // assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*nRequests, failedReqs, 0.05*nRequests) + assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*nRequests, failedReqs, 0.05*nRequests) } diff --git a/proxy/proxy.go b/proxy/proxy.go index 5503ca511e..613a8f854e 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -145,6 +145,82 @@ type OpenTracingParams struct { ExcludeTags []string } +type PassiveHealthCheck struct { + // The period of time after which the endpointregistry begin to calculate endpoints statistics + // from scratch + Period time.Duration + + // The minimum number of total requests that should be sent to an endpoint to + // potentially opt out the endpoint from the list of healthy endpoints + MinRequests int64 + + // The minimum ratio of failed requests to total requests that were sent to an endpoint + // to opt out the endpoint from the list of healthy endpoints + MinFailed float64 + + // The discount coefficient for the total requests to the unhealthy endpoints + RequestsDiscount float64 +} + +func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) { + if len(o) == 0 { + return false, &PassiveHealthCheck{}, nil + } + + result := &PassiveHealthCheck{} + keysInitialized := make(map[string]struct{}) + + for key, value := range o { + switch key { + case "period": + period, err := time.ParseDuration(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + } + if period < 0 { + return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + } + result.Period = period + case "min-requests": + minRequests, err := strconv.Atoi(value) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + } + if minRequests < 0 { + return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + } + result.MinRequests = int64(minRequests) + case "min-failed": + minFailed, err := strconv.ParseFloat(value, 64) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid minFailed value: %s", value) + } + if minFailed < 0 || minFailed > 1 { + return false, nil, fmt.Errorf("passive health check: invalid minFailed value: %s", value) + } + result.MinFailed = minFailed + case "requests-discount": + requestsDiscount, err := strconv.ParseFloat(value, 64) + if err != nil { + return false, nil, fmt.Errorf("passive health check: invalid requestsDiscount value: %s", value) + } + if requestsDiscount <= 0 || requestsDiscount >= 1 { + return false, nil, fmt.Errorf("passive health check: invalid requestsDiscount value: %s", value) + } + result.RequestsDiscount = requestsDiscount + default: + return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + } + + keysInitialized[key] = struct{}{} + } + + if len(keysInitialized) != 4 { + return false, nil, fmt.Errorf("passive health check: missing required parameters") + } + return true, result, nil +} + // Proxy initialization options. type Params struct { // The proxy expects a routing instance that is used to match @@ -247,6 +323,12 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry + + // EnablePassiveHealthCheck enables the healthy endpoints checker + EnablePassiveHealthCheck bool + + // PassiveHealthCheck defines the parameters for the healthy endpoints checker. + PassiveHealthCheck *PassiveHealthCheck } type ( @@ -324,6 +406,7 @@ type Proxy struct { routing *routing.Routing registry *routing.EndpointRegistry fadein *fadeIn + heathlyEndpoints *healthyEndpoints roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -466,6 +549,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route endpoints := rt.LBEndpoints endpoints = p.fadein.filterFadeIn(endpoints, rt) + endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt) lbctx := &routing.LBContext{ Request: ctx.request, @@ -717,11 +801,22 @@ func WithParams(p Params) *Proxy { } hostname := os.Getenv("HOSTNAME") + healthyEndpoints := &healthyEndpoints{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + } + if !p.EnablePassiveHealthCheck { + healthyEndpoints = nil + } return &Proxy{ - routing: p.Routing, - registry: p.EndpointRegistry, - fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry}, + routing: p.Routing, + registry: p.EndpointRegistry, + fadein: &fadeIn{ + rnd: rand.New(loadbalancer.NewLockedSource()), + endpointRegistry: p.EndpointRegistry, + }, + heathlyEndpoints: healthyEndpoints, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, @@ -888,9 +983,15 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co req = injectClientTrace(req, ctx.proxySpan) response, err := roundTripper.RoundTrip(req) - + if endpointMetrics != nil { + endpointMetrics.IncTotalRequests() + } ctx.proxySpan.LogKV("http_roundtrip", EndEvent) if err != nil { + if endpointMetrics != nil { + endpointMetrics.IncTotalFailedRoundTrips() + } + if errors.Is(err, skpio.ErrBlocked) { p.tracing.setTag(ctx.proxySpan, BlockTag, true) p.tracing.setTag(ctx.proxySpan, HTTPStatusCodeTag, uint16(http.StatusBadRequest)) diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 6b8018fa6c..f569d2e920 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -208,12 +208,14 @@ func newTestProxyWithFiltersAndParams(fr filters.Registry, doc string, params Pa } tl := loggingtest.New() - endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + if params.EndpointRegistry == nil { + params.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{}) + } opts := routing.Options{ FilterRegistry: fr, PollTimeout: sourcePollTimeout, DataClients: []routing.DataClient{dc}, - PostProcessors: []routing.PostProcessor{loadbalancer.NewAlgorithmProvider(), endpointRegistry}, + PostProcessors: []routing.PostProcessor{loadbalancer.NewAlgorithmProvider(), params.EndpointRegistry}, Log: tl, Predicates: []routing.PredicateSpec{teePredicate.New()}, } diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index f7b94315b1..323a33129d 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -22,12 +22,22 @@ type Metrics interface { InflightRequests() int64 IncInflightRequest() DecInflightRequest() + + IncTotalRequests() + IncTotalFailedRoundTrips() + HealthCheckDropProbability() float64 } type entry struct { detected atomic.Value // time.Time lastSeen atomic.Value // time.Time inflightRequests atomic.Int64 + + lastStatsReset atomic.Value // time.Time + totalRequests [2]atomic.Int64 + totalFailedRoundTrips [2]atomic.Int64 + curSlot atomic.Int64 + healthCheckDropProbability atomic.Value // float64 } var _ Metrics = &entry{} @@ -40,6 +50,10 @@ func (e *entry) LastSeen() time.Time { return e.lastSeen.Load().(time.Time) } +func (e *entry) lastStatsResetTime() time.Time { + return e.lastStatsReset.Load().(time.Time) +} + func (e *entry) InflightRequests() int64 { return e.inflightRequests.Load() } @@ -60,23 +74,54 @@ func (e *entry) SetLastSeen(ts time.Time) { e.lastSeen.Store(ts) } +func (e *entry) setLastStatsReset(ts time.Time) { + e.lastStatsReset.Store(ts) +} + +func (e *entry) IncTotalRequests() { + curSlot := e.curSlot.Load() + e.totalRequests[curSlot].Add(1) +} + +func (e *entry) IncTotalFailedRoundTrips() { + curSlot := e.curSlot.Load() + e.totalFailedRoundTrips[curSlot].Add(1) +} + +func (e *entry) HealthCheckDropProbability() float64 { + return e.healthCheckDropProbability.Load().(float64) +} + func newEntry() *entry { result := &entry{} + result.healthCheckDropProbability.Store(0.0) result.SetDetected(time.Time{}) result.SetLastSeen(time.Time{}) + result.setLastStatsReset(time.Time{}) return result } type EndpointRegistry struct { - lastSeenTimeout time.Duration - now func() time.Time - data sync.Map // map[string]*entry + lastSeenTimeout time.Duration + passiveHealthCheckEnabled bool + statsResetPeriod time.Duration + minRequests int64 + minFailed float64 + requestsDiscount float64 + + now func() time.Time + data sync.Map // map[string]*entry } var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration + LastSeenTimeout time.Duration + PassiveHealthCheckEnabled bool + StatsResetPeriod time.Duration + MinRequests int64 + MinFailed float64 + RequestsDiscount float64 } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -103,12 +148,31 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { } removeOlder := now.Add(-r.lastSeenTimeout) + resetOlder := now.Add(-r.statsResetPeriod) r.data.Range(func(key, value any) bool { e := value.(*entry) if e.LastSeen().Before(removeOlder) { r.data.Delete(key) } + if r.passiveHealthCheckEnabled && e.lastStatsResetTime().Before(resetOlder) { + e.setLastStatsReset(now) + curSlot := e.curSlot.Load() + nextSlot := (curSlot + 1) % 2 + e.totalFailedRoundTrips[nextSlot].Store(0) + e.totalRequests[nextSlot].Store(0) + + failed := e.totalFailedRoundTrips[curSlot].Load() + requests := e.totalRequests[curSlot].Load() + failedRoundTripsRatio := float64(failed) / float64(requests) + if failedRoundTripsRatio > r.minFailed && requests > r.minRequests { + e.healthCheckDropProbability.Store(r.requestsDiscount * failedRoundTripsRatio) + } else { + e.healthCheckDropProbability.Store(0.0) + } + e.curSlot.Store(nextSlot) + } + return true }) @@ -121,9 +185,14 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { } return &EndpointRegistry{ - data: sync.Map{}, - lastSeenTimeout: o.LastSeenTimeout, - now: time.Now, + data: sync.Map{}, + lastSeenTimeout: o.LastSeenTimeout, + passiveHealthCheckEnabled: o.PassiveHealthCheckEnabled, + statsResetPeriod: o.StatsResetPeriod, + minRequests: o.MinRequests, + minFailed: o.MinFailed, + requestsDiscount: o.RequestsDiscount, + now: time.Now, } } diff --git a/skipper.go b/skipper.go index 95f1d5a2f0..131b5fddfa 100644 --- a/skipper.go +++ b/skipper.go @@ -915,6 +915,8 @@ type Options struct { OpenPolicyAgentEnvoyMetadata string OpenPolicyAgentCleanerInterval time.Duration OpenPolicyAgentStartupTimeout time.Duration + + PassiveHealthCheck map[string]string } func (o *Options) KubernetesDataClientOptions() kubernetes.Options { @@ -1888,8 +1890,19 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { }) defer schedulerRegistry.Close() + passiveHealthCheckEnabled, passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) + if err != nil { + return err + } + // create a routing engine - endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheckEnabled: passiveHealthCheckEnabled, + StatsResetPeriod: passiveHealthCheck.Period, + MinRequests: passiveHealthCheck.MinRequests, + MinFailed: passiveHealthCheck.MinFailed, + RequestsDiscount: passiveHealthCheck.RequestsDiscount, + }) ro := routing.Options{ FilterRegistry: o.filterRegistry(), MatchingOptions: mo, @@ -1955,6 +1968,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { proxyParams := proxy.Params{ Routing: routing, EndpointRegistry: endpointRegistry, + EnablePassiveHealthCheck: passiveHealthCheckEnabled, + PassiveHealthCheck: passiveHealthCheck, Flags: proxyFlags, PriorityRoutes: o.PriorityRoutes, IdleConnectionsPerHost: o.IdleConnectionsPerHost,