Skip to content

Commit

Permalink
Execute health checks for ring clients in parallel
Browse files Browse the repository at this point in the history
Fixes #236

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 1, 2022
1 parent 45ea666 commit 54c422e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [ENHANCEMENT] Concurrency: Add LimitedConcurrencySingleFlight to run jobs concurrently and with in-flight deduplication. #214
* [ENHANCEMENT] Add the ability to define custom gRPC health checks. #227
* [ENHANCEMENT] Import Bytes type, DeleteAll function and DNS package from Thanos. #228
* [ENHANCEMENT] Execute health checks in ring client pool concurrently. #237
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
24 changes: 18 additions & 6 deletions ring/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/ring/util"
"github.com/grafana/dskit/services"
)
Expand All @@ -35,9 +36,10 @@ type PoolServiceDiscovery func() ([]string, error)

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
CheckInterval time.Duration
HealthCheckEnabled bool
HealthCheckTimeout time.Duration
CheckInterval time.Duration
HealthCheckEnabled bool
HealthCheckTimeout time.Duration
MaxConcurrentHealthChecks int
}

// Pool holds a cache of grpc_health_v1 clients.
Expand Down Expand Up @@ -173,9 +175,18 @@ func (p *Pool) removeStaleClients() {
}
}

// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck.
// cleanUnhealthy loops through all servers and deletes any that fails a healthcheck
// The health checks are executed concurrently with p.cfg.MaxConcurrentHealthChecks.
// If this is not set (0), then all healthchecks of all clients are executed in parallel.
func (p *Pool) cleanUnhealthy() {
for _, addr := range p.RegisteredAddresses() {
addresses := p.RegisteredAddresses()

maxConcurrent := p.cfg.MaxConcurrentHealthChecks
if p.cfg.MaxConcurrentHealthChecks == 0 {
maxConcurrent = len(addresses)
}
_ = concurrency.ForEachJob(context.Background(), len(addresses), maxConcurrent, func(_ context.Context, idx int) error {
addr := addresses[idx]
client, ok := p.fromCache(addr)
// not ok means someone removed a client between the start of this loop and now
if ok {
Expand All @@ -185,7 +196,8 @@ func (p *Pool) cleanUnhealthy() {
p.RemoveClientFor(addr)
}
}
}
return nil
})
}

// healthCheck will check if the client is still healthy, returning an error if it is not
Expand Down
57 changes: 35 additions & 22 deletions ring/client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,41 @@ func TestPoolCache(t *testing.T) {
}

func TestCleanUnhealthy(t *testing.T) {
goodAddrs := []string{"good1", "good2"}
badAddrs := []string{"bad1", "bad2"}
clients := map[string]PoolClient{}
for _, addr := range goodAddrs {
clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}
}
for _, addr := range badAddrs {
clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
}
pool := &Pool{
clients: clients,
logger: log.NewNopLogger(),
}
pool.cleanUnhealthy()
for _, addr := range badAddrs {
if _, ok := pool.clients[addr]; ok {
t.Errorf("Found bad client after clean: %s\n", addr)
}
tcs := []struct {
maxConcurrent int
}{
{maxConcurrent: 0}, // unbounded parallelism
{maxConcurrent: 2},
}
for _, addr := range goodAddrs {
if _, ok := pool.clients[addr]; !ok {
t.Errorf("Could not find good client after clean: %s\n", addr)
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("max concurrent %d", tc.maxConcurrent), func(t *testing.T) {
goodAddrs := []string{"good1", "good2"}
badAddrs := []string{"bad1", "bad2"}
clients := map[string]PoolClient{}
for _, addr := range goodAddrs {
clients[addr] = mockClient{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}
}
for _, addr := range badAddrs {
clients[addr] = mockClient{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
}
pool := &Pool{
cfg: PoolConfig{
MaxConcurrentHealthChecks: tc.maxConcurrent,
},
clients: clients,
logger: log.NewNopLogger(),
}
pool.cleanUnhealthy()
for _, addr := range badAddrs {
if _, ok := pool.clients[addr]; ok {
t.Errorf("Found bad client after clean: %s\n", addr)
}
}
for _, addr := range goodAddrs {
if _, ok := pool.clients[addr]; !ok {
t.Errorf("Could not find good client after clean: %s\n", addr)
}
}
})
}
}

0 comments on commit 54c422e

Please sign in to comment.