diff --git a/CHANGELOG.md b/CHANGELOG.md index f7a931af2..62207ab07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * [ENHANCEMENT] Concurrency: Add LimitedConcurrencySingleFlight to run jobs concurrently and with in-flight deduplication. #214 * [ENHANCEMENT] Add the ability to define custom gRPC health checks. #227 * [ENHANCEMENT] Import Bytes type, DeleteAll function and DNS package from Thanos. #228 +* [ENHANCEMENT] Execute health checks in ring client pool concurrently. #237 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/ring/client/pool.go b/ring/client/pool.go index 57b462cc4..eca27ef68 100644 --- a/ring/client/pool.go +++ b/ring/client/pool.go @@ -13,6 +13,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/ring/util" "github.com/grafana/dskit/services" ) @@ -35,9 +36,10 @@ type PoolServiceDiscovery func() ([]string, error) // PoolConfig is config for creating a Pool. type PoolConfig struct { - CheckInterval time.Duration - HealthCheckEnabled bool - HealthCheckTimeout time.Duration + CheckInterval time.Duration + HealthCheckEnabled bool + HealthCheckTimeout time.Duration + MaxConcurrentHealthChecks int // defaults to 16 } // Pool holds a cache of grpc_health_v1 clients. @@ -58,6 +60,10 @@ type Pool struct { // NewPool creates a new Pool. func NewPool(clientName string, cfg PoolConfig, discovery PoolServiceDiscovery, factory PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger) *Pool { + if cfg.MaxConcurrentHealthChecks == 0 { + cfg.MaxConcurrentHealthChecks = 16 + } + p := &Pool{ cfg: cfg, discovery: discovery, @@ -173,24 +179,30 @@ func (p *Pool) removeStaleClients() { } } -// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck. +// cleanUnhealthy loops through all servers and deletes any that fail a healthcheck. +// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks. func (p *Pool) cleanUnhealthy() { - for _, addr := range p.RegisteredAddresses() { + addresses := p.RegisteredAddresses() + _ = concurrency.ForEachJob(context.Background(), len(addresses), p.cfg.MaxConcurrentHealthChecks, func(ctx context.Context, idx int) error { + addr := addresses[idx] client, ok := p.fromCache(addr) // not ok means someone removed a client between the start of this loop and now if ok { - err := healthCheck(client, p.cfg.HealthCheckTimeout) + err := healthCheck(ctx, client, p.cfg.HealthCheckTimeout) if err != nil { level.Warn(p.logger).Log("msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName), "addr", addr, "reason", err) p.RemoveClientFor(addr) } } - } + // Never return an error, because otherwise the processing would stop and + // remaining health checks would not been executed. + return nil + }) } // healthCheck will check if the client is still healthy, returning an error if it is not -func healthCheck(client PoolClient, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func healthCheck(ctx context.Context, client PoolClient, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ctx = user.InjectOrgID(ctx, "0") diff --git a/ring/client/pool_test.go b/ring/client/pool_test.go index 9e7028b2e..eb657e92f 100644 --- a/ring/client/pool_test.go +++ b/ring/client/pool_test.go @@ -49,7 +49,7 @@ func TestHealthCheck(t *testing.T) { {mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, } for _, tc := range tcs { - err := healthCheck(tc.client, 50*time.Millisecond) + err := healthCheck(context.Background(), tc.client, 50*time.Millisecond) hasError := err != nil if hasError != tc.hasError { t.Errorf("Expected error: %t, error: %v", tc.hasError, err) @@ -120,28 +120,43 @@ func TestPoolCache(t *testing.T) { } func TestCleanUnhealthy(t *testing.T) { - goodAddrs := []string{"good1", "good2"} - badAddrs := []string{"bad1", "bad2"} - clients := map[string]PoolClient{} - for _, addr := range goodAddrs { - clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING} - } - for _, addr := range badAddrs { - clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING} - } - pool := &Pool{ - clients: clients, - logger: log.NewNopLogger(), - } - pool.cleanUnhealthy() - for _, addr := range badAddrs { - if _, ok := pool.clients[addr]; ok { - t.Errorf("Found bad client after clean: %s\n", addr) - } + tcs := []struct { + maxConcurrent int + }{ + {maxConcurrent: 0}, // if not set, defaults to 16 + {maxConcurrent: 1}, } - for _, addr := range goodAddrs { - if _, ok := pool.clients[addr]; !ok { - t.Errorf("Could not find good client after clean: %s\n", addr) - } + for _, tc := range tcs { + t.Run(fmt.Sprintf("max concurrent %d", tc.maxConcurrent), func(t *testing.T) { + goodAddrs := []string{"good1", "good2"} + badAddrs := []string{"bad1", "bad2"} + clients := map[string]PoolClient{} + for _, addr := range goodAddrs { + clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING} + } + for _, addr := range badAddrs { + clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING} + } + + cfg := PoolConfig{ + MaxConcurrentHealthChecks: tc.maxConcurrent, + CheckInterval: 1 * time.Second, + HealthCheckTimeout: 5 * time.Millisecond, + } + pool := NewPool("test", cfg, nil, nil, nil, log.NewNopLogger()) + pool.clients = clients + pool.cleanUnhealthy() + + for _, addr := range badAddrs { + if _, ok := pool.clients[addr]; ok { + t.Errorf("Found bad client after clean: %s\n", addr) + } + } + for _, addr := range goodAddrs { + if _, ok := pool.clients[addr]; !ok { + t.Errorf("Could not find good client after clean: %s\n", addr) + } + } + }) } }