Skip to content

Commit

Permalink
fixup! Execute health checks for ring clients in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 2, 2022
1 parent 54c422e commit 1b6abd2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
20 changes: 9 additions & 11 deletions ring/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Pool struct {

// NewPool creates a new Pool.
func NewPool(clientName string, cfg PoolConfig, discovery PoolServiceDiscovery, factory PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger) *Pool {
if cfg.MaxConcurrentHealthChecks == 0 {
cfg.MaxConcurrentHealthChecks = 16
}

p := &Pool{
cfg: cfg,
discovery: discovery,
Expand Down Expand Up @@ -175,22 +179,16 @@ func (p *Pool) removeStaleClients() {
}
}

// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck
// cleanUnhealthy loops through all servers and deletes any that fail a healthcheck.
// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.
// If this is not set (0), then all healthchecks of all clients are executed in parallel.
func (p *Pool) cleanUnhealthy() {
addresses := p.RegisteredAddresses()

maxConcurrent := p.cfg.MaxConcurrentHealthChecks
if p.cfg.MaxConcurrentHealthChecks == 0 {
maxConcurrent = len(addresses)
}
_ = concurrency.ForEachJob(context.Background(), len(addresses), maxConcurrent, func(_ context.Context, idx int) error {
_ = concurrency.ForEachJob(context.Background(), len(addresses), p.cfg.MaxConcurrentHealthChecks, func(ctx context.Context, idx int) error {
addr := addresses[idx]
client, ok := p.fromCache(addr)
// not ok means someone removed a client between the start of this loop and now
if ok {
err := healthCheck(client, p.cfg.HealthCheckTimeout)
err := healthCheck(ctx, client, p.cfg.HealthCheckTimeout)
if err != nil {
level.Warn(p.logger).Log("msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName), "addr", addr, "reason", err)
p.RemoveClientFor(addr)
Expand All @@ -201,8 +199,8 @@ func (p *Pool) cleanUnhealthy() {
}

// healthCheck will check if the client is still healthy, returning an error if it is not
func healthCheck(client PoolClient, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func healthCheck(ctx context.Context, client PoolClient, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = user.InjectOrgID(ctx, "0")

Expand Down
21 changes: 12 additions & 9 deletions ring/client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -49,7 +50,7 @@ func TestHealthCheck(t *testing.T) {
{mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true},
}
for _, tc := range tcs {
err := healthCheck(tc.client, 50*time.Millisecond)
err := healthCheck(context.Background(), tc.client, 50*time.Millisecond)
hasError := err != nil
if hasError != tc.hasError {
t.Errorf("Expected error: %t, error: %v", tc.hasError, err)
Expand Down Expand Up @@ -123,8 +124,8 @@ func TestCleanUnhealthy(t *testing.T) {
tcs := []struct {
maxConcurrent int
}{
{maxConcurrent: 0}, // unbounded parallelism
{maxConcurrent: 2},
{maxConcurrent: 0}, // if not set, defaults to 16
{maxConcurrent: 1},
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("max concurrent %d", tc.maxConcurrent), func(t *testing.T) {
Expand All @@ -137,14 +138,16 @@ func TestCleanUnhealthy(t *testing.T) {
for _, addr := range badAddrs {
clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
}
pool := &Pool{
cfg: PoolConfig{
MaxConcurrentHealthChecks: tc.maxConcurrent,
},
clients: clients,
logger: log.NewNopLogger(),

cfg := PoolConfig{
MaxConcurrentHealthChecks: tc.maxConcurrent,
CheckInterval: 1 * time.Second,
HealthCheckTimeout: 5 * time.Millisecond,
}
pool := NewPool("test", cfg, nil, nil, nil, log.NewLogfmtLogger(os.Stderr))
pool.clients = clients
pool.cleanUnhealthy()

for _, addr := range badAddrs {
if _, ok := pool.clients[addr]; ok {
t.Errorf("Found bad client after clean: %s\n", addr)
Expand Down

0 comments on commit 1b6abd2

Please sign in to comment.