Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3/balance: fixed flaky balancer tests #14204

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 51 additions & 43 deletions clientv3/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"strings"
"testing"
"time"

"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
Expand Down Expand Up @@ -92,24 +91,25 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
return picked, err
}

prev, switches := "", 0
_, picked, err := warmupConnections(reqFunc, tc.serverCount)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}

// verify that we round robin
prev, switches := picked, 0
for i := 0; i < tc.reqN; i++ {
picked, err := reqFunc(context.Background())
picked, err = reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if prev == "" {
prev = picked
continue
}
if prev != picked {
switches++
}
prev = picked
}
if tc.serverCount > 1 && switches < tc.reqN-3 { // -3 for initial resolutions
// TODO: FIX ME
t.Skipf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
if tc.serverCount > 1 && switches != tc.reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
}
})
}
Expand Down Expand Up @@ -162,25 +162,18 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
// stop first server, loads should be redistributed
// stopped server should never be picked
ms.StopAt(0)
available := make(map[string]struct{})
for i := 1; i < serverCount; i++ {
available[eps[i]] = struct{}{}
available, picked, err := warmupConnections(reqFunc, serverCount-1)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}

reqN := 10
prev, switches := "", 0
prev, switches := picked, 0
for i := 0; i < reqN; i++ {
picked, err := reqFunc(context.Background())
picked, err = reqFunc(context.Background())
if err != nil && strings.Contains(err.Error(), "transport is closing") {
continue
}
if prev == "" { // first failover
if eps[0] == picked {
t.Fatalf("expected failover from %q, picked %q", eps[0], picked)
}
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("picked unavailable address %q (available %v)", picked, available)
}
Expand All @@ -189,18 +182,19 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
}
prev = picked
}
if switches < reqN-3 { // -3 for initial resolutions + failover
// TODO: FIX ME!
t.Skipf("expected balanced loads for %d requests, got switches %d", reqN, switches)
if switches != reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}

// now failed server comes back
ms.StartAt(0)

// enough time for reconnecting to recovered server
time.Sleep(time.Second)
available, picked, err = warmupConnections(reqFunc, serverCount)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}

prev, switches = "", 0
prev, switches = picked, 0
recoveredAddr, recovered := eps[0], 0
available[recoveredAddr] = struct{}{}

Expand All @@ -209,10 +203,6 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if prev == "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
Expand All @@ -224,10 +214,10 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
}
prev = picked
}
if switches < reqN-3 { // -3 for initial resolutions
if switches != 2*reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
if recovered < reqN/serverCount {
if recovered != 2*reqN/serverCount {
t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered)
}
}
Expand All @@ -242,11 +232,10 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
}
defer ms.Stop()
var eps []string
available := make(map[string]struct{})
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
available[svr.Address] = struct{}{}
}

rsv, err := endpoint.NewResolverGroup("requestfail")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -277,6 +266,11 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
return picked, err
}

available, picked, err := warmupConnections(reqFunc, serverCount)
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}

reqN := 20
prev, switches := "", 0
for i := 0; i < reqN; i++ {
Expand All @@ -285,17 +279,13 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
if i%2 == 0 {
cancel()
}
picked, err := reqFunc(ctx)
picked, err = reqFunc(ctx)
if i%2 == 0 {
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled || picked != "" {
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled {
t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err)
}
continue
}
if prev == "" && picked != "" {
prev = picked
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
Expand All @@ -304,7 +294,25 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
}
prev = picked
}
if switches < reqN/2-3 { // -3 for initial resolutions + failover
if switches != reqN/2 {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
}

type reqFuncT = func(ctx context.Context) (picked string, err error)

func warmupConnections(reqFunc reqFuncT, serverCount int) (map[string]struct{}, string, error) {
var picked string
var err error
available := make(map[string]struct{})
// cycle through all peers to indirectly verify that balancer subconn list is fully loaded
// otherwise we can't reliably count switches between 'picked' peers in the test assert phase
for len(available) < serverCount {
picked, err = reqFunc(context.Background())
if err != nil {
return available, picked, err
}
available[picked] = struct{}{}
}
return available, picked, err
}