From 54c422e74220a8b088013f777a9eb4b82c70c7ad Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 1 Dec 2022 14:08:19 +0100 Subject: [PATCH] Execute health checks for ring clients in parallel Fixes #236 Signed-off-by: Christian Haudum --- CHANGELOG.md | 1 + ring/client/pool.go | 24 ++++++++++++----- ring/client/pool_test.go | 57 ++++++++++++++++++++++++---------------- 3 files changed, 54 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7a931af2..62207ab07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * [ENHANCEMENT] Concurrency: Add LimitedConcurrencySingleFlight to run jobs concurrently and with in-flight deduplication. #214 * [ENHANCEMENT] Add the ability to define custom gRPC health checks. #227 * [ENHANCEMENT] Import Bytes type, DeleteAll function and DNS package from Thanos. #228 +* [ENHANCEMENT] Execute health checks in ring client pool concurrently. #237 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/ring/client/pool.go b/ring/client/pool.go index 57b462cc4..c90faa0e7 100644 --- a/ring/client/pool.go +++ b/ring/client/pool.go @@ -13,6 +13,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/ring/util" "github.com/grafana/dskit/services" ) @@ -35,9 +36,10 @@ type PoolServiceDiscovery func() ([]string, error) // PoolConfig is config for creating a Pool. type PoolConfig struct { - CheckInterval time.Duration - HealthCheckEnabled bool - HealthCheckTimeout time.Duration + CheckInterval time.Duration + HealthCheckEnabled bool + HealthCheckTimeout time.Duration + MaxConcurrentHealthChecks int } // Pool holds a cache of grpc_health_v1 clients. @@ -173,9 +175,18 @@ func (p *Pool) removeStaleClients() { } } -// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck. +// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck +// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks. +// If this is not set (0), then all healthchecks of all clients are executed in parallel. func (p *Pool) cleanUnhealthy() { - for _, addr := range p.RegisteredAddresses() { + addresses := p.RegisteredAddresses() + + maxConcurrent := p.cfg.MaxConcurrentHealthChecks + if p.cfg.MaxConcurrentHealthChecks == 0 { + maxConcurrent = len(addresses) + } + _ = concurrency.ForEachJob(context.Background(), len(addresses), maxConcurrent, func(_ context.Context, idx int) error { + addr := addresses[idx] client, ok := p.fromCache(addr) // not ok means someone removed a client between the start of this loop and now if ok { @@ -185,7 +196,8 @@ func (p *Pool) cleanUnhealthy() { p.RemoveClientFor(addr) } } - } + return nil + }) } // healthCheck will check if the client is still healthy, returning an error if it is not diff --git a/ring/client/pool_test.go b/ring/client/pool_test.go index 9e7028b2e..386baeb5a 100644 --- a/ring/client/pool_test.go +++ b/ring/client/pool_test.go @@ -120,28 +120,41 @@ func TestPoolCache(t *testing.T) { } func TestCleanUnhealthy(t *testing.T) { - goodAddrs := []string{"good1", "good2"} - badAddrs := []string{"bad1", "bad2"} - clients := map[string]PoolClient{} - for _, addr := range goodAddrs { - clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING} - } - for _, addr := range badAddrs { - clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING} - } - pool := &Pool{ - clients: clients, - logger: log.NewNopLogger(), - } - pool.cleanUnhealthy() - for _, addr := range badAddrs { - if _, ok := pool.clients[addr]; ok { - t.Errorf("Found bad client after clean: %s\n", addr) - } + tcs := []struct { + maxConcurrent int + }{ + {maxConcurrent: 0}, // unbounded parallelism + {maxConcurrent: 2}, } - for _, addr := range goodAddrs { - if _, ok := pool.clients[addr]; !ok { - t.Errorf("Could not find good client after clean: %s\n", addr) - } + for _, tc := range tcs { + t.Run(fmt.Sprintf("max concurrent %d", tc.maxConcurrent), func(t *testing.T) { + goodAddrs := []string{"good1", "good2"} + badAddrs := []string{"bad1", "bad2"} + clients := map[string]PoolClient{} + for _, addr := range goodAddrs { + clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING} + } + for _, addr := range badAddrs { + clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING} + } + pool := &Pool{ + cfg: PoolConfig{ + MaxConcurrentHealthChecks: tc.maxConcurrent, + }, + clients: clients, + logger: log.NewNopLogger(), + } + pool.cleanUnhealthy() + for _, addr := range badAddrs { + if _, ok := pool.clients[addr]; ok { + t.Errorf("Found bad client after clean: %s\n", addr) + } + } + for _, addr := range goodAddrs { + if _, ok := pool.clients[addr]; !ok { + t.Errorf("Could not find good client after clean: %s\n", addr) + } + } + }) } }