Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: fix tests not properly updating subconn states #6501

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to set the StateListener here?

if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down
11 changes: 11 additions & 0 deletions internal/balancer/gracefulswitch/gracefulswitch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,11 @@ func (mb1 *mockBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewS
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { mb1.UpdateSubConnState(sc, state) }
}
defer func() {
if sc != nil {
sc.Connect()
}
}()
return mb1.cc.NewSubConn(addrs, opts)
}

Expand Down Expand Up @@ -1023,6 +1028,7 @@ func (vb *verifyBalancer) newSubConn(addrs []resolver.Address, opts balancer.New
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { vb.UpdateSubConnState(sc, state) }
}
defer func() { sc.Connect() }()
return vb.cc.NewSubConn(addrs, opts)
}

Expand Down Expand Up @@ -1076,6 +1082,11 @@ func (bcb *buildCallbackBal) newSubConn(addrs []resolver.Address, opts balancer.
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { bcb.UpdateSubConnState(sc, state) }
}
defer func() {
if sc != nil {
sc.Connect()
}
}()
return bcb.cc.NewSubConn(addrs, opts)
}

Expand Down
16 changes: 16 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)

Expand All @@ -40,13 +41,26 @@ type TestSubConn struct {
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
}

// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
// should be created via TestClientConn.NewSubConn instead, but can be useful
// for some tests.
func NewTestSubConn(id string) *TestSubConn {
return &TestSubConn{
ConnectCh: make(chan struct{}, 1),
connectCalled: grpcsync.NewEvent(),
id: id,
}
}

// UpdateAddresses is a no-op.
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}

// Connect is a no-op.
func (tsc *TestSubConn) Connect() {
tsc.connectCalled.Fire()
select {
case tsc.ConnectCh <- struct{}{}:
default:
Expand All @@ -60,6 +74,7 @@ func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.P

// UpdateState pushes the state to the listener, if one is registered.
func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
<-tsc.connectCalled.Done()
if tsc.stateListener != nil {
tsc.stateListener(state)
return
Expand Down Expand Up @@ -109,6 +124,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
}
tcc.subConnIdx++
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
Expand Down
32 changes: 18 additions & 14 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func (s) TestDropByCategory(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.

const rpcCount = 20
Expand Down Expand Up @@ -283,13 +283,13 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const rpcCount = 100
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
Expand Down Expand Up @@ -375,7 +375,11 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
// Create a subConn which will be used later on to test the race
// between UpdateSubConnState() and Close().
bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here, with the StateListener?

if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
Expand Down Expand Up @@ -410,7 +414,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
// that we use as the child policy will not send a picker update until the
// parent policy is closed.
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.Close()
close(closeCh)

Expand Down Expand Up @@ -449,7 +453,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
Expand All @@ -464,7 +468,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
Expand Down Expand Up @@ -524,13 +528,13 @@ func (s) TestReResolution(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
Expand All @@ -543,13 +547,13 @@ func (s) TestReResolution(t *testing.T) {
t.Fatalf("timeout waiting for ResolveNow()")
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
Expand Down Expand Up @@ -608,13 +612,13 @@ func (s) TestLoadReporting(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const successCount = 5
const errorCount = 5
Expand Down
36 changes: 20 additions & 16 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func TestClusterPicks(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -247,8 +247,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -313,8 +313,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Should have no more newSubConn.
select {
Expand Down Expand Up @@ -404,8 +404,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -488,8 +488,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m2[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p3 := <-cc.NewPickerCh
Expand Down Expand Up @@ -582,7 +582,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here?

if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down Expand Up @@ -632,7 +636,7 @@ func TestInitialIdle(t *testing.T) {
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
Expand Down Expand Up @@ -673,8 +677,8 @@ func TestClusterGracefulSwitch(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
pi := balancer.PickInfo{
Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
Expand Down Expand Up @@ -703,7 +707,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
Expand All @@ -716,7 +720,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
testPick(t, p2, pi, sc2, nil)
// The Graceful Switch process completing for the child should cause the
Expand Down
Loading