Skip to content

Commit

Permalink
grpc: Fix flaky picker_wrapper tests (#7560)
Browse files Browse the repository at this point in the history
* Wait for go routines to finish before cancelling the context

* improve error messages
  • Loading branch information
arjan-bal authored Aug 26, 2024
1 parent 9feed00 commit c535946
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpc
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -80,19 +81,24 @@ func (s) TestBlockingPick(t *testing.T) {
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
}()
}
time.Sleep(50 * time.Millisecond)
if c := atomic.LoadUint64(&finishedCount); c != 0 {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
Expand All @@ -102,19 +108,24 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// All goroutines should block because picker returns no subConn available.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
}()
}
time.Sleep(50 * time.Millisecond)
if c := atomic.LoadUint64(&finishedCount); c != 0 {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
Expand All @@ -125,19 +136,24 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
defer cancel()
// All goroutines should block because picker returns transientFailure and
// picks are not failfast.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
}()
}
time.Sleep(time.Millisecond)
if c := atomic.LoadUint64(&finishedCount); c != 0 {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
Expand All @@ -147,17 +163,22 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// All goroutines should block because subConn is not ready.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
}()
}
time.Sleep(time.Millisecond)
if c := atomic.LoadUint64(&finishedCount); c != 0 {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

0 comments on commit c535946

Please sign in to comment.