Skip to content

Commit

Permalink
Execute health checks in ring pool in parallel
Browse files Browse the repository at this point in the history
The ring client pool executes the health checks for its servers
sequentially, which can lead to problems when there are a lot of servers
to check, especially when the targets do not respond fast enough.

This PR changes the execution from sequential to parallel. If the new
`MaxConcurrentHealthChecks` config setting is not set (`0` value), then
health checks are executed with a parallelism of `16`, otherwise the
parallelism from the setting is used.

Fixes #236

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 12, 2022
1 parent 45ea666 commit c620fe8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [ENHANCEMENT] Concurrency: Add LimitedConcurrencySingleFlight to run jobs concurrently and with in-flight deduplication. #214
* [ENHANCEMENT] Add the ability to define custom gRPC health checks. #227
* [ENHANCEMENT] Import Bytes type, DeleteAll function and DNS package from Thanos. #228
* [ENHANCEMENT] Execute health checks in ring client pool concurrently. #237
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
30 changes: 21 additions & 9 deletions ring/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/ring/util"
"github.com/grafana/dskit/services"
)
Expand All @@ -35,9 +36,10 @@ type PoolServiceDiscovery func() ([]string, error)

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
CheckInterval time.Duration
HealthCheckEnabled bool
HealthCheckTimeout time.Duration
CheckInterval time.Duration
HealthCheckEnabled bool
HealthCheckTimeout time.Duration
MaxConcurrentHealthChecks int // defaults to 16
}

// Pool holds a cache of grpc_health_v1 clients.
Expand All @@ -58,6 +60,10 @@ type Pool struct {

// NewPool creates a new Pool.
func NewPool(clientName string, cfg PoolConfig, discovery PoolServiceDiscovery, factory PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger) *Pool {
if cfg.MaxConcurrentHealthChecks == 0 {
cfg.MaxConcurrentHealthChecks = 16
}

p := &Pool{
cfg: cfg,
discovery: discovery,
Expand Down Expand Up @@ -173,24 +179,30 @@ func (p *Pool) removeStaleClients() {
}
}

// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck.
// cleanUnhealthy loops through all servers and deletes any that fail a healthcheck.
// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.
func (p *Pool) cleanUnhealthy() {
for _, addr := range p.RegisteredAddresses() {
addresses := p.RegisteredAddresses()
_ = concurrency.ForEachJob(context.Background(), len(addresses), p.cfg.MaxConcurrentHealthChecks, func(ctx context.Context, idx int) error {
addr := addresses[idx]
client, ok := p.fromCache(addr)
// not ok means someone removed a client between the start of this loop and now
if ok {
err := healthCheck(client, p.cfg.HealthCheckTimeout)
err := healthCheck(ctx, client, p.cfg.HealthCheckTimeout)
if err != nil {
level.Warn(p.logger).Log("msg", fmt.Sprintf("removing %s failing healthcheck", p.clientName), "addr", addr, "reason", err)
p.RemoveClientFor(addr)
}
}
}
// Never return an error, because otherwise the processing would stop and
// remaining health checks would not been executed.
return nil
})
}

// healthCheck will check if the client is still healthy, returning an error if it is not
func healthCheck(client PoolClient, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func healthCheck(ctx context.Context, client PoolClient, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = user.InjectOrgID(ctx, "0")

Expand Down
61 changes: 38 additions & 23 deletions ring/client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestHealthCheck(t *testing.T) {
{mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true},
}
for _, tc := range tcs {
err := healthCheck(tc.client, 50*time.Millisecond)
err := healthCheck(context.Background(), tc.client, 50*time.Millisecond)
hasError := err != nil
if hasError != tc.hasError {
t.Errorf("Expected error: %t, error: %v", tc.hasError, err)
Expand Down Expand Up @@ -120,28 +120,43 @@ func TestPoolCache(t *testing.T) {
}

func TestCleanUnhealthy(t *testing.T) {
goodAddrs := []string{"good1", "good2"}
badAddrs := []string{"bad1", "bad2"}
clients := map[string]PoolClient{}
for _, addr := range goodAddrs {
clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}
}
for _, addr := range badAddrs {
clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
}
pool := &Pool{
clients: clients,
logger: log.NewNopLogger(),
}
pool.cleanUnhealthy()
for _, addr := range badAddrs {
if _, ok := pool.clients[addr]; ok {
t.Errorf("Found bad client after clean: %s\n", addr)
}
tcs := []struct {
maxConcurrent int
}{
{maxConcurrent: 0}, // if not set, defaults to 16
{maxConcurrent: 1},
}
for _, addr := range goodAddrs {
if _, ok := pool.clients[addr]; !ok {
t.Errorf("Could not find good client after clean: %s\n", addr)
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("max concurrent %d", tc.maxConcurrent), func(t *testing.T) {
goodAddrs := []string{"good1", "good2"}
badAddrs := []string{"bad1", "bad2"}
clients := map[string]PoolClient{}
for _, addr := range goodAddrs {
clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}
}
for _, addr := range badAddrs {
clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
}

cfg := PoolConfig{
MaxConcurrentHealthChecks: tc.maxConcurrent,
CheckInterval: 1 * time.Second,
HealthCheckTimeout: 5 * time.Millisecond,
}
pool := NewPool("test", cfg, nil, nil, nil, log.NewNopLogger())
pool.clients = clients
pool.cleanUnhealthy()

for _, addr := range badAddrs {
if _, ok := pool.clients[addr]; ok {
t.Errorf("Found bad client after clean: %s\n", addr)
}
}
for _, addr := range goodAddrs {
if _, ok := pool.clients[addr]; !ok {
t.Errorf("Could not find good client after clean: %s\n", addr)
}
}
})
}
}

0 comments on commit c620fe8

Please sign in to comment.