diff --git a/balancer/balancer.go b/balancer/balancer.go index 668e1f1e38a5..3385b1ed6eec 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -367,8 +367,9 @@ type Balancer interface { // Deprecated: Use NewSubConnOptions.StateListener when creating the // SubConn instead. UpdateSubConnState(SubConn, SubConnState) - // Close closes the balancer. The balancer is not required to call - // ClientConn.RemoveSubConn for its existing SubConns. + // Close closes the balancer. The balancer is not currently required to + // call SubConn.Shutdown for its existing SubConns; however, this will be + // required in a future release, so it is recommended. Close() } diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 3929c26d31e1..b0fa5192161e 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -121,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { sc := sci.(balancer.SubConn) // a was removed by resolver. if _, ok := addrsSet.Get(a); !ok { - b.cc.RemoveSubConn(sc) + sc.Shutdown() b.subConns.Delete(a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. @@ -204,8 +204,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su case connectivity.Idle: sc.Connect() case connectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scStates. Remove state for this sc here. + // When an address was removed by resolver, b called Shutdown but kept + // the sc's state in scStates. Remove state for this sc here. delete(b.scStates, sc) case connectivity.TransientFailure: // Save error to be reported via picker. @@ -226,7 +226,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su } // Close is a nop because base balancer doesn't have internal state to clean up, -// and it doesn't need to call RemoveSubConn for the SubConns. +// and it doesn't need to call Shutdown for the SubConns. func (b *baseBalancer) Close() { } diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 4d616a98b0c3..f2ddfc3788ed 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -213,7 +213,7 @@ type lbBalancer struct { backendAddrsWithoutMetadata []resolver.Address // Roundrobin functionalities. state connectivity.State - subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. + subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. picker balancer.Picker // Support fallback to resolved backend addresses if there's no response @@ -290,7 +290,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { // aggregateSubConnStats calculate the aggregated state of SubConns in // lb.SubConns. These SubConns are subconns in use (when switching between // fallback and grpclb). lb.scState contains states for all SubConns, including -// those in cache (SubConns are cached for 10 seconds after remove). +// those in cache (SubConns are cached for 10 seconds after shutdown). // // The aggregated state is: // - If at least one SubConn in Ready, the aggregated state is Ready; @@ -345,8 +345,8 @@ func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubCo case connectivity.Idle: sc.Connect() case connectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scStates. Remove state for this sc here. + // When an address was removed by resolver, b called Shutdown but kept + // the sc's state in scStates. Remove state for this sc here. delete(lb.scStates, sc) case connectivity.TransientFailure: lb.connErr = scs.ConnectionError diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 6c99a6d788f6..edb66a90a3b1 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -113,7 +113,6 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback } balancingPolicyChanged := lb.usePickFirst != pickFirst - oldUsePickFirst := lb.usePickFirst lb.usePickFirst = pickFirst if fallbackModeChanged || balancingPolicyChanged { @@ -123,13 +122,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback // For fallback mode switching with pickfirst, we want to recreate the // SubConn because the creds could be different. for a, sc := range lb.subConns { - if oldUsePickFirst { - // If old SubConn were created for pickfirst, bypass cache and - // remove directly. - lb.cc.ClientConn.RemoveSubConn(sc) - } else { - lb.cc.RemoveSubConn(sc) - } + sc.Shutdown() delete(lb.subConns, a) } } @@ -144,7 +137,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback } if sc != nil { if len(backendAddrs) == 0 { - lb.cc.ClientConn.RemoveSubConn(sc) + sc.Shutdown() delete(lb.subConns, scKey) return } @@ -197,7 +190,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback for a, sc := range lb.subConns { // a was removed by resolver. if _, ok := addrsSet[a]; !ok { - lb.cc.RemoveSubConn(sc) + sc.Shutdown() delete(lb.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. diff --git a/balancer/grpclb/grpclb_util.go b/balancer/grpclb/grpclb_util.go index 6e01bf803d6a..680779f1c82e 100644 --- a/balancer/grpclb/grpclb_util.go +++ b/balancer/grpclb/grpclb_util.go @@ -91,7 +91,7 @@ func (r *lbManualResolver) UpdateState(s resolver.State) { const subConnCacheTime = time.Second * 10 // lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. -// SubConns will be kept in cache for subConnCacheTime before being removed. +// SubConns will be kept in cache for subConnCacheTime before being shut down. // // Its NewSubconn and SubConn.Shutdown methods are updated to do cache first. type lbCacheClientConn struct { @@ -149,7 +149,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer } func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { - sc.Shutdown() + logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc) } type lbCacheSubConn struct { @@ -168,9 +168,9 @@ func (sc *lbCacheSubConn) Shutdown() { if entry, ok := ccc.subConnCache[addr]; ok { if entry.sc != sc { - // This could happen if NewSubConn was called multiple times for the - // same address, and those SubConns are all removed. We remove sc - // immediately here. + // This could happen if NewSubConn was called multiple times for + // the same address, and those SubConns are all shut down. We + // remove sc immediately here. delete(ccc.subConnToAddr, sc) sc.SubConn.Shutdown() } @@ -214,7 +214,7 @@ func (ccc *lbCacheClientConn) UpdateState(s balancer.State) { func (ccc *lbCacheClientConn) close() { ccc.mu.Lock() defer ccc.mu.Unlock() - // Only cancel all existing timers. There's no need to remove SubConns. + // Only cancel all existing timers. There's no need to shut down SubConns. for _, entry := range ccc.subConnCache { entry.cancel() } diff --git a/balancer/grpclb/grpclb_util_test.go b/balancer/grpclb/grpclb_util_test.go index 31d6eaa55db5..c09edc324e15 100644 --- a/balancer/grpclb/grpclb_util_test.go +++ b/balancer/grpclb/grpclb_util_test.go @@ -61,7 +61,7 @@ func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.Ne } func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) { - sc.Shutdown() + panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc)) } const testCacheTimeout = 100 * time.Millisecond @@ -87,7 +87,7 @@ func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error { return nil } -// Test that SubConn won't be immediately removed. +// Test that SubConn won't be immediately shut down. func (s) TestLBCacheClientConnExpire(t *testing.T) { mcc := newMockClientConn() if err := checkMockCC(mcc, 0); err != nil { @@ -110,7 +110,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) { t.Fatal(err) } - ccc.RemoveSubConn(sc) + sc.Shutdown() // One subconn in MockCC before timeout. if err := checkMockCC(mcc, 1); err != nil { t.Fatal(err) @@ -138,7 +138,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) { } } -// Test that NewSubConn with the same address of a SubConn being removed will +// Test that NewSubConn with the same address of a SubConn being shut down will // reuse the SubConn and cancel the removing. func (s) TestLBCacheClientConnReuse(t *testing.T) { mcc := newMockClientConn() @@ -162,7 +162,7 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) { t.Fatal(err) } - ccc.RemoveSubConn(sc) + sc.Shutdown() // One subconn in MockCC before timeout. if err := checkMockCC(mcc, 1); err != nil { t.Fatal(err) @@ -195,8 +195,8 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) { t.Fatal(err) } - // Call remove again, will delete after timeout. - ccc.RemoveSubConn(sc) + // Call Shutdown again, will delete after timeout. + sc.Shutdown() // One subconn in MockCC before timeout. if err := checkMockCC(mcc, 1); err != nil { t.Fatal(err) @@ -223,9 +223,9 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) { } } -// Test that if the timer to remove a SubConn fires at the same time NewSubConn -// cancels the timer, it doesn't cause deadlock. -func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) { +// Test that if the timer to shut down a SubConn fires at the same time +// NewSubConn cancels the timer, it doesn't cause deadlock. +func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) { mcc := newMockClientConn() if err := checkMockCC(mcc, 0); err != nil { t.Fatal(err) @@ -251,9 +251,9 @@ func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) { go func() { for i := 0; i < 1000; i++ { - // Remove starts a timer with 1 ns timeout, the NewSubConn will race - // with with the timer. - ccc.RemoveSubConn(sc) + // Shutdown starts a timer with 1 ns timeout, the NewSubConn will + // race with with the timer. + sc.Shutdown() sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) } close(done) diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 797b9aa0a960..e835b1c21914 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -187,7 +187,7 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) { // addr was removed by resolver. Remove. wsci, _ := b.subConns.Get(addr) wsc := wsci.(*weightedSubConn) - b.cc.RemoveSubConn(wsc.SubConn) + wsc.SubConn.Shutdown() b.subConns.Delete(addr) } } diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index ff4b57a6fb94..845c143bfa88 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -241,12 +241,12 @@ func (s) TestWeightedTarget(t *testing.T) { // attribute set to the config that was passed to it. verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2")) - // The subconn for cluster_1 should be removed. - scRemoved := <-cc.RemoveSubConnCh - if scRemoved != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) + // The subconn for cluster_1 should be shut down. + scShutdown := <-cc.ShutdownSubConnCh + if scShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } - scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) sc2 := <-cc.NewSubConnCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) @@ -286,12 +286,12 @@ func (s) TestWeightedTarget(t *testing.T) { } verifyAddressInNewSubConn(t, cc, addr3) - // The subconn from the test_config_balancer should be removed. - scRemoved = <-cc.RemoveSubConnCh - if scRemoved != sc2 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) + // The subconn from the test_config_balancer should be shut down. + scShutdown = <-cc.ShutdownSubConnCh + if scShutdown != sc2 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } - scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) // Send subconn state change. sc3 := <-cc.NewSubConnCh @@ -409,12 +409,12 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // Expect one SubConn to be removed. - scRemoved := <-cc.RemoveSubConnCh - if scRemoved != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) + // Expect one SubConn to be shut down. + scShutdown := <-cc.ShutdownSubConnCh + if scShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } - scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh // Test pick with only the second SubConn. @@ -579,7 +579,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { t.Fatalf("want %v, got %v", want, err) } - // Remove subConn corresponding to addr3. + // Shut down subConn corresponding to addr3. if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), @@ -590,11 +590,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scRemoved := <-cc.RemoveSubConnCh - if scRemoved != sc3 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved) + scShutdown := <-cc.ShutdownSubConnCh + if scShutdown != sc3 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown) } - scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { @@ -823,9 +823,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { // picker which ensures that the removed subBalancer is not picked for RPCs. p = <-cc.NewPickerCh - scRemoved := <-cc.RemoveSubConnCh - if scRemoved != sc2 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved) + scShutdown := <-cc.ShutdownSubConnCh + if scShutdown != sc2 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown) } want = []balancer.SubConn{sc1, sc3} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { @@ -865,9 +865,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { // Removing a subBalancer causes the weighted target LB policy to push a new // picker which ensures that the removed subBalancer is not picked for RPCs. - scRemoved = <-cc.RemoveSubConnCh - if scRemoved != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) + scShutdown = <-cc.ShutdownSubConnCh + if scShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 8b3e819d4351..3025a4d6b190 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -311,23 +311,8 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { - if ccb.isIdleOrClosed() { - // It it safe to ignore this call when the balancer is closed or in idle - // because the ClientConn takes care of closing the connections. - // - // Not returning early from here when the balancer is closed or in idle - // leads to a deadlock though, because of the following sequence of - // calls when holding cc.mu: - // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> - // ccb.RemoveAddrConn --> cc.removeAddrConn - return - } - - acbw, ok := sc.(*acBalancerWrapper) - if !ok { - return - } - ccb.cc.removeAddrConn(acbw.ac, errConnDrain) + // The graceful switch balancer will never call this. + logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc") } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { @@ -392,7 +377,20 @@ func (acbw *acBalancerWrapper) Connect() { } func (acbw *acBalancerWrapper) Shutdown() { - acbw.ccb.RemoveSubConn(acbw) + ccb := acbw.ccb + if ccb.isIdleOrClosed() { + // It it safe to ignore this call when the balancer is closed or in idle + // because the ClientConn takes care of closing the connections. + // + // Not returning early from here when the balancer is closed or in idle + // leads to a deadlock though, because of the following sequence of + // calls when holding cc.mu: + // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> + // ccb.RemoveAddrConn --> cc.removeAddrConn + return + } + + ccb.cc.removeAddrConn(acbw.ac, errConnDrain) } // NewStream begins a streaming RPC on the addrConn. If the addrConn is not diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index dad905cffdbf..3c594e6e4e55 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -255,7 +255,7 @@ func (gsb *Balancer) Close() { // // It implements the balancer.ClientConn interface and is passed down in that // capacity to the wrapped balancer. It maintains a set of subConns created by -// the wrapped balancer and calls from the latter to create/update/remove +// the wrapped balancer and calls from the latter to create/update/shutdown // SubConns update this set before being forwarded to the parent ClientConn. // State updates from the wrapped balancer can result in invocation of the // graceful switch logic. @@ -267,9 +267,10 @@ type balancerWrapper struct { subconns map[balancer.SubConn]bool // subconns created by this balancer } -// Close closes the underlying LB policy and removes the subconns it created. bw -// must not be referenced via balancerCurrent or balancerPending in gsb when -// called. gsb.mu must not be held. Does not panic with a nil receiver. +// Close closes the underlying LB policy and shuts down the subconns it +// created. bw must not be referenced via balancerCurrent or balancerPending in +// gsb when called. gsb.mu must not be held. Does not panic with a nil +// receiver. func (bw *balancerWrapper) Close() { // before Close is called. if bw == nil { @@ -282,7 +283,7 @@ func (bw *balancerWrapper) Close() { bw.Balancer.Close() bw.gsb.mu.Lock() for sc := range bw.subconns { - bw.gsb.cc.RemoveSubConn(sc) + sc.Shutdown() } bw.gsb.mu.Unlock() } @@ -345,7 +346,7 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne } bw.gsb.mu.Lock() if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call - bw.gsb.cc.RemoveSubConn(sc) + sc.Shutdown() bw.gsb.mu.Unlock() return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw) } @@ -364,6 +365,8 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) { } func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) { + // Note: existing third party balancers may call this, so it must remain + // until RemoveSubConn is fully removed. sc.Shutdown() } diff --git a/internal/balancer/gracefulswitch/gracefulswitch_test.go b/internal/balancer/gracefulswitch/gracefulswitch_test.go index 773645f04dee..b49c8d86312c 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch_test.go +++ b/internal/balancer/gracefulswitch/gracefulswitch_test.go @@ -414,32 +414,32 @@ func (s) TestBalancerSubconns(t *testing.T) { } // balancerCurrent removing sc1 should get forwarded to the ClientConn. - gsb.balancerCurrent.Balancer.(*mockBalancer).removeSubConn(sc1) + sc1.Shutdown() select { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") - case sc := <-tcc.RemoveSubConnCh: + case sc := <-tcc.ShutdownSubConnCh: if sc != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc) + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, sc) } } // balancerPending removing sc2 should get forwarded to the ClientConn. - gsb.balancerPending.Balancer.(*mockBalancer).removeSubConn(sc2) + sc2.Shutdown() select { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") - case sc := <-tcc.RemoveSubConnCh: + case sc := <-tcc.ShutdownSubConnCh: if sc != sc2 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, sc) + t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, sc) } } } -// TestBalancerClose tests the graceful switch balancer's Close() functionality. -// From the Close() call, the graceful switch balancer should remove any created -// Subconns and Close() the current and pending load balancers. This Close() -// call should also cause any other events (calls to entrance functions) to be -// no-ops. +// TestBalancerClose tests the graceful switch balancer's Close() +// functionality. From the Close() call, the graceful switch balancer should +// shut down any created Subconns and Close() the current and pending load +// balancers. This Close() call should also cause any other events (calls to +// entrance functions) to be no-ops. func (s) TestBalancerClose(t *testing.T) { // Setup gsb balancer with current, pending, and one created SubConn on both // current and pending. @@ -479,25 +479,25 @@ func (s) TestBalancerClose(t *testing.T) { gsb.Close() // The order of SubConns the graceful switch load balancer tells the Client - // Conn to remove is non deterministic, as it is stored in a map. However, - // the first SubConn removed should be either sc1 or sc2. + // Conn to shut down is non deterministic, as it is stored in a + // map. However, the first SubConn shut down should be either sc1 or sc2. select { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") - case sc := <-tcc.RemoveSubConnCh: + case sc := <-tcc.ShutdownSubConnCh: if sc != sc1 && sc != sc2 { - t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) + t.Fatalf("ShutdownSubConn, want either %v or %v, got %v", sc1, sc2, sc) } } // The graceful switch load balancer should then tell the ClientConn to - // remove the other SubConn. + // shut down the other SubConn. select { case <-ctx.Done(): t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn") - case sc := <-tcc.RemoveSubConnCh: + case sc := <-tcc.ShutdownSubConnCh: if sc != sc1 && sc != sc2 { - t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc) + t.Fatalf("ShutdownSubConn, want either %v or %v, got %v", sc1, sc2, sc) } } @@ -615,16 +615,16 @@ func (s) TestPendingReplacedByAnotherPending(t *testing.T) { // Replace pending with a SwitchTo() call. gsb.SwitchTo(mockBalancerBuilder2{}) // The pending balancer being replaced should cause the graceful switch - // balancer to Remove() any created SubConns for the old pending balancer + // balancer to Shutdown() any created SubConns for the old pending balancer // and also Close() the old pending balancer. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() select { case <-ctx.Done(): - t.Fatalf("timeout while waiting for a RemoveSubConn call on the ClientConn") - case sc := <-tcc.RemoveSubConnCh: + t.Fatalf("timeout while waiting for a SubConn.Shutdown") + case sc := <-tcc.ShutdownSubConnCh: if sc != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc) + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, sc) } } @@ -750,8 +750,8 @@ func (s) TestInlineCallbackInBuild(t *testing.T) { } select { case <-ctx.Done(): - t.Fatalf("timeout while waiting for an RemoveSubConn() call on the ClientConn") - case <-tcc.RemoveSubConnCh: + t.Fatalf("timeout while waiting for a Shutdown() call on the SubConn") + case <-tcc.ShutdownSubConnCh: } oldCurrent := gsb.balancerCurrent.Balancer.(*buildCallbackBal) @@ -775,8 +775,8 @@ func (s) TestInlineCallbackInBuild(t *testing.T) { } select { case <-ctx.Done(): - t.Fatalf("timeout while waiting for an RemoveSubConn() call on the ClientConn") - case <-tcc.RemoveSubConnCh: + t.Fatalf("timeout while waiting for a Shutdown() call on the SubConn") + case <-tcc.ShutdownSubConnCh: } // The current balancer should be closed as a result of the swap. @@ -965,10 +965,6 @@ func (mb1 *mockBalancer) updateAddresses(sc balancer.SubConn, addrs []resolver.A mb1.cc.UpdateAddresses(sc, addrs) } -func (mb1 *mockBalancer) removeSubConn(sc balancer.SubConn) { - mb1.cc.RemoveSubConn(sc) -} - type mockBalancerBuilder2 struct{} func (mockBalancerBuilder2) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { @@ -1047,7 +1043,7 @@ func (buildCallbackBalancerBuilder) Build(cc balancer.ClientConn, opts balancer. return nil } b.updateAddresses(sc, []resolver.Address{}) - b.removeSubConn(sc) + sc.Shutdown() return b } @@ -1094,10 +1090,6 @@ func (bcb *buildCallbackBal) updateAddresses(sc balancer.SubConn, addrs []resolv bcb.cc.UpdateAddresses(sc, addrs) } -func (bcb *buildCallbackBal) removeSubConn(sc balancer.SubConn) { - bcb.cc.RemoveSubConn(sc) -} - // waitForClose verifies that the mockBalancer is closed before the context // expires. func (bcb *buildCallbackBal) waitForClose(ctx context.Context) error { diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 99deb9260ccd..46e51545349d 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -553,7 +553,7 @@ func (bg *BalancerGroup) Close() { bg.incomingStarted = false // Also remove all SubConns. for sc := range bg.scToSubBalancer { - bg.cc.RemoveSubConn(sc) + sc.Shutdown() delete(bg.scToSubBalancer, sc) } } diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index fbe1da9f5a28..571b815ef8a0 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -116,7 +116,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { gator.Stop() bg.Close() for i := 0; i < 4; i++ { - (<-cc.RemoveSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + (<-cc.ShutdownSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) } // Add b3, weight 1, backends [1,2]. @@ -244,8 +244,8 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat // removed after close timeout. for i := 0; i < 10; i++ { select { - case <-cc.RemoveSubConnCh: - t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)") + case <-cc.ShutdownSubConnCh: + t.Fatalf("Got request to shut down subconn, want no shut down subconn (because subconns were still in cache)") default: } time.Sleep(time.Millisecond) @@ -310,7 +310,7 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) { } } -// Sub-balancers are put in cache when they are removed. If balancer group is +// Sub-balancers are put in cache when they are shut down. If balancer group is // closed within close timeout, all subconns should still be rmeoved // immediately. func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) { @@ -318,51 +318,51 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) { _, bg, cc, addrToSC := initBalancerGroupForCachingTest(t) bg.Close() - // The balancer group is closed. The subconns should be removed immediately. - removeTimeout := time.After(time.Millisecond * 500) - scToRemove := map[balancer.SubConn]int{ + // The balancer group is closed. The subconns should be shutdown immediately. + shutdownTimeout := time.After(time.Millisecond * 500) + scToShutdown := map[balancer.SubConn]int{ addrToSC[testBackendAddrs[0]]: 1, addrToSC[testBackendAddrs[1]]: 1, addrToSC[testBackendAddrs[2]]: 1, addrToSC[testBackendAddrs[3]]: 1, } - for i := 0; i < len(scToRemove); i++ { + for i := 0; i < len(scToShutdown); i++ { select { - case sc := <-cc.RemoveSubConnCh: - c := scToRemove[sc] + case sc := <-cc.ShutdownSubConnCh: + c := scToShutdown[sc] if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) + t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c) } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") + scToShutdown[sc] = c - 1 + case <-shutdownTimeout: + t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down") } } } // Sub-balancers in cache will be closed if not re-added within timeout, and -// subConns will be removed. +// subConns will be shut down. func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) { defer replaceDefaultSubBalancerCloseTimeout(time.Second)() _, _, cc, addrToSC := initBalancerGroupForCachingTest(t) // The sub-balancer is not re-added within timeout. The subconns should be - // removed. - removeTimeout := time.After(DefaultSubBalancerCloseTimeout) - scToRemove := map[balancer.SubConn]int{ + // shut down. + shutdownTimeout := time.After(DefaultSubBalancerCloseTimeout) + scToShutdown := map[balancer.SubConn]int{ addrToSC[testBackendAddrs[2]]: 1, addrToSC[testBackendAddrs[3]]: 1, } - for i := 0; i < len(scToRemove); i++ { + for i := 0; i < len(scToShutdown); i++ { select { - case sc := <-cc.RemoveSubConnCh: - c := scToRemove[sc] + case sc := <-cc.ShutdownSubConnCh: + c := scToShutdown[sc] if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) + t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c) } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") + scToShutdown[sc] = c - 1 + case <-shutdownTimeout: + t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down") } } } @@ -381,35 +381,35 @@ func (*noopBalancerBuilderWrapper) Name() string { } // After removing a sub-balancer, re-add with same ID, but different balancer -// builder. Old subconns should be removed, and new subconns should be created. +// builder. Old subconns should be shut down, and new subconns should be created. func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) { defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)() gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t) // Re-add sub-balancer-1, but with a different balancer builder. The // sub-balancer was still in cache, but cann't be reused. This should cause - // old sub-balancer's subconns to be removed immediately, and new subconns - // to be created. + // old sub-balancer's subconns to be shut down immediately, and new + // subconns to be created. gator.Add(testBalancerIDs[1], 1) bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder}) // The cached sub-balancer should be closed, and the subconns should be - // removed immediately. - removeTimeout := time.After(time.Millisecond * 500) - scToRemove := map[balancer.SubConn]int{ + // shut down immediately. + shutdownTimeout := time.After(time.Millisecond * 500) + scToShutdown := map[balancer.SubConn]int{ addrToSC[testBackendAddrs[2]]: 1, addrToSC[testBackendAddrs[3]]: 1, } - for i := 0; i < len(scToRemove); i++ { + for i := 0; i < len(scToShutdown); i++ { select { - case sc := <-cc.RemoveSubConnCh: - c := scToRemove[sc] + case sc := <-cc.ShutdownSubConnCh: + c := scToShutdown[sc] if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) + t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c) } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") + scToShutdown[sc] = c - 1 + case <-shutdownTimeout: + t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down") } } @@ -630,15 +630,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { for i := 0; i < 2; i++ { select { case <-ctx.Done(): - t.Fatalf("error waiting for RemoveSubConn()") - case sc := <-cc.RemoveSubConnCh: - // The SubConn removed should have been one of the two created + t.Fatalf("error waiting for Shutdown()") + case sc := <-cc.ShutdownSubConnCh: + // The SubConn shut down should have been one of the two created // SubConns, and both should be deleted. if ok := scs[sc]; ok { delete(scs, sc) continue } else { - t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs) + t.Fatalf("Shutdown called for wrong SubConn %v, want in %v", sc, scs) } } } diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index 73aef813f3eb..61cb55e7c11c 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -34,6 +34,7 @@ import ( type testingLogger interface { Log(args ...interface{}) Logf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) } // TestSubConn implements the SubConn interface, to be used in tests. @@ -82,12 +83,12 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { } } -// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent +// Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent // TestClientConn. func (tsc *TestSubConn) Shutdown() { tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc) select { - case tsc.tcc.RemoveSubConnCh <- tsc: + case tsc.tcc.ShutdownSubConnCh <- tsc: default: } } @@ -103,7 +104,7 @@ type TestClientConn struct { NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn. NewSubConnCh chan *TestSubConn // the last 10 subconn created. - RemoveSubConnCh chan *TestSubConn // the last 10 subconn removed. + ShutdownSubConnCh chan *TestSubConn // the last 10 subconn removed. UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses(). NewPickerCh chan balancer.Picker // the last picker updated. @@ -120,7 +121,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn { NewSubConnAddrsCh: make(chan []resolver.Address, 10), NewSubConnCh: make(chan *TestSubConn, 10), - RemoveSubConnCh: make(chan *TestSubConn, 10), + ShutdownSubConnCh: make(chan *TestSubConn, 10), UpdateAddressesAddrsCh: make(chan []resolver.Address, 1), NewPickerCh: make(chan balancer.Picker, 1), @@ -153,9 +154,10 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon return sc, nil } -// RemoveSubConn removes the SubConn. +// RemoveSubConn is a nop; tests should all be updated to use sc.Shutdown() +// instead. func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { - sc.(*TestSubConn).Shutdown() + tcc.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc) } // UpdateAddresses updates the addresses on the SubConn. diff --git a/pickfirst.go b/pickfirst.go index 00dd7633ebf1..2e9cf66b4afc 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -117,9 +117,9 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState // The resolver reported an empty address list. Treat it like an error by // calling b.ResolverError. if b.subConn != nil { - // Remove the old subConn. All addresses were removed, so it is no longer - // valid. - b.cc.RemoveSubConn(b.subConn) + // Shut down the old subConn. All addresses were removed, so it is + // no longer valid. + b.subConn.Shutdown() b.subConn = nil } b.ResolverError(errors.New("produced zero addresses")) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index 14dfa8ea7eff..d18f89c98f5d 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -360,11 +360,11 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) { } } -// TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose tests the scenario -// where the balancer being switched out calls RemoveSubConn() in its Close() +// TestBalancerSwitch_OldBalancerCallsShutdownInClose tests the scenario where +// the balancer being switched out calls Shutdown() in its Close() // method. Verifies that this sequence of calls doesn't lead to a deadlock. -func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) { - // Register a stub balancer which calls RemoveSubConn() from its Close(). +func (s) TestBalancerSwitch_OldBalancerCallsShutdownInClose(t *testing.T) { + // Register a stub balancer which calls Shutdown() from its Close(). scChan := make(chan balancer.SubConn, 1) uccsCalled := make(chan struct{}, 1) stub.Register(t.Name(), stub.BalancerFuncs{ @@ -378,7 +378,7 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) { return nil }, Close: func(data *stub.BalancerData) { - data.ClientConn.RemoveSubConn(<-scChan) + (<-scChan).Shutdown() }, }) @@ -406,10 +406,9 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) { // The following service config update will switch balancer from our stub // balancer to pick_first. The former will be closed, which will call - // cc.RemoveSubConn() inline (this RemoveSubConn is not required by the API, - // but some balancers might do it). + // sc.Shutdown() inline. // - // This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a + // This is to make sure the sc.Shutdown() from Close() doesn't cause a // deadlock (e.g. trying to grab a mutex while it's already locked). // // Do it in a goroutine so this test will fail with a helpful message diff --git a/test/balancer_test.go b/test/balancer_test.go index e769d304df2a..600b83f39f44 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -1062,58 +1062,77 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) { } } -// TestSubConnShutdown confirms that the Shutdown method on subconns properly -// initiates their shutdown. +// TestSubConnShutdown confirms that the Shutdown method on subconns and +// RemoveSubConn method on ClientConn properly initiates subconn shutdown. func (s) TestSubConnShutdown(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - gotShutdown := grpcsync.NewEvent() + testCases := []struct { + name string + shutdown func(cc balancer.ClientConn, sc balancer.SubConn) + }{{ + name: "ClientConn.RemoveSubConn", + shutdown: func(cc balancer.ClientConn, sc balancer.SubConn) { + cc.RemoveSubConn(sc) + }, + }, { + name: "SubConn.Shutdown", + shutdown: func(_ balancer.ClientConn, sc balancer.SubConn) { + sc.Shutdown() + }, + }} - bf := stub.BalancerFuncs{ - UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - var sc balancer.SubConn - opts := balancer.NewSubConnOptions{ - StateListener: func(scs balancer.SubConnState) { - switch scs.ConnectivityState { - case connectivity.Connecting: - // Ignored. - case connectivity.Ready: - sc.Shutdown() - case connectivity.Shutdown: - gotShutdown.Fire() - default: - t.Errorf("got unexpected state %q in listener", scs.ConnectivityState) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotShutdown := grpcsync.NewEvent() + + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + var sc balancer.SubConn + opts := balancer.NewSubConnOptions{ + StateListener: func(scs balancer.SubConnState) { + switch scs.ConnectivityState { + case connectivity.Connecting: + // Ignored. + case connectivity.Ready: + tc.shutdown(bd.ClientConn, sc) + case connectivity.Shutdown: + gotShutdown.Fire() + default: + t.Errorf("got unexpected state %q in listener", scs.ConnectivityState) + } + }, } + sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts) + if err != nil { + return err + } + sc.Connect() + // Report the state as READY to unblock ss.Start(), which waits for ready. + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) + return nil }, } - sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts) - if err != nil { - return err - } - sc.Connect() - // Report the state as READY to unblock ss.Start(), which waits for ready. - bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) - return nil - }, - } - const testBalName = "shutdown-test-balancer" - stub.Register(testBalName, bf) - t.Logf("Registered balancer %s...", testBalName) + testBalName := "shutdown-test-balancer-" + tc.name + stub.Register(testBalName, bf) + t.Logf("Registered balancer %s...", testBalName) - ss := &stubserver.StubServer{} - if err := ss.Start(nil, grpc.WithDefaultServiceConfig( - fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName), - )); err != nil { - t.Fatalf("Error starting endpoint server: %v", err) - } - defer ss.Stop() + ss := &stubserver.StubServer{} + if err := ss.Start(nil, grpc.WithDefaultServiceConfig( + fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName), + )); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() - select { - case <-gotShutdown.Done(): - // Success - case <-ctx.Done(): - t.Fatalf("Timed out waiting for gotShutdown to be fired.") + select { + case <-gotShutdown.Done(): + // Success + case <-ctx.Done(): + t.Fatalf("Timed out waiting for gotShutdown to be fired.") + } + }) } } diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 67521be91ebb..a9b75dcb5a61 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -403,7 +403,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer } func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) { - sc.Shutdown() + b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc) } func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 3733b5492d3c..3f594dcbc072 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -453,7 +453,7 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { select { case <-time.After(time.Millisecond * 500): t.Fatalf("timeout waiting for remove subconn") - case <-cc.RemoveSubConnCh: + case <-cc.ShutdownSubConnCh: } } @@ -729,12 +729,12 @@ func TestClusterGracefulSwitch(t *testing.T) { defer cancel() select { case <-ctx.Done(): - t.Fatalf("error waiting for RemoveSubConn()") - case rsc := <-cc.RemoveSubConnCh: + t.Fatalf("error waiting for sc.Shutdown()") + case rsc := <-cc.ShutdownSubConnCh: // The SubConn removed should have been the created SubConn // from the child before switching. if rsc != sc1 { - t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1) + t.Fatalf("Shutdown() got: %v, want %v", rsc, sc1) } } } diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 12509fe599a3..871055f09171 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -502,7 +502,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal } func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { - sc.Shutdown() + b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc) } // appendIfPresent appends the scw to the address, if the address is present in diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 16dd395404bb..2d7b40b4d58e 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -1538,7 +1538,7 @@ func (s) TestConcurrentOperations(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - od.RemoveSubConn(scw1) + scw1.Shutdown() }() wg.Add(1) diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 5933665b98cf..007f48709f77 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -143,8 +143,8 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { select { case sc := <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn: %s", sc) - case sc := <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn: %v", sc) + case sc := <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn: %v", sc) case <-time.After(time.Millisecond * 100): } @@ -175,8 +175,8 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { select { case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") - case <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn") + case <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn") case <-time.After(time.Millisecond * 100): } @@ -279,8 +279,8 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { select { case sc := <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn, %s", sc) - case <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn") + case <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn") case <-time.After(time.Millisecond * 100): } @@ -325,10 +325,10 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // p2 SubConns are removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc2 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) + // p2 SubConns are shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc2 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scToShutdown) } // Should get an update with 1's old transient failure picker, to override @@ -423,10 +423,10 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) - // p1 subconn should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) + // p1 subconn should be shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { @@ -612,13 +612,13 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { // When 0 becomes ready, 0 should be used, 1 and 2 should all be closed. sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) - // sc1 and sc2 should be removed. + // sc1 and sc2 should be shut down. // // With localities caching, the lower priorities are closed after a timeout, // in goroutines. The order is no longer guaranteed. - scToRemove := []balancer.SubConn{<-cc.RemoveSubConnCh, <-cc.RemoveSubConnCh} - if !(scToRemove[0] == sc1 && scToRemove[1] == sc2) && !(scToRemove[0] == sc2 && scToRemove[1] == sc1) { - t.Errorf("RemoveSubConn, want [%v, %v], got %v", sc1, sc2, scToRemove) + scToShutdown := []balancer.SubConn{<-cc.ShutdownSubConnCh, <-cc.ShutdownSubConnCh} + if !(scToShutdown[0] == sc1 && scToShutdown[1] == sc2) && !(scToShutdown[0] == sc2 && scToShutdown[1] == sc1) { + t.Errorf("ShutdownSubConn, want [%v, %v], got %v", sc1, sc2, scToShutdown) } // Test pick with 0. @@ -765,10 +765,10 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // p0 subconn should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc0 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) + // p0 subconn should be shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc0 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } // Test pick return TransientFailure. @@ -836,10 +836,10 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // p1 subconn should be removed. - scToRemove1 := <-cc.RemoveSubConnCh - if scToRemove1 != sc11 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1) + // p1 subconn should be shut down. + scToShutdown1 := <-cc.ShutdownSubConnCh + if scToShutdown1 != sc11 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc11, scToShutdown1) } // Test pick return NoSubConn. @@ -862,8 +862,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { t.Fatalf("got unexpected new picker") case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") - case <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn") + case <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn") case <-time.After(time.Millisecond * 100): } } @@ -931,10 +931,10 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // p0 will remove the subconn, and ClientConn will send a sc update to + // p0 will shutdown the subconn, and ClientConn will send a sc update to // shutdown. - scToRemove := <-cc.RemoveSubConnCh - scToRemove.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) + scToShutdown := <-cc.ShutdownSubConnCh + scToShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testBackendAddrStrs[1]; got != want { @@ -1079,10 +1079,10 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { t.Fatal(err.Error()) } - // Old subconn should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + // Old subconn should be shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } addrs2 := <-cc.NewSubConnAddrsCh @@ -1181,9 +1181,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { } // Old subconn from child-0 should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc0 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc0 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } // Because this was a ready child moved to a higher priority, no new subconn @@ -1191,8 +1191,8 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { select { case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") - case <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn") + case <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn") case <-time.After(time.Millisecond * 100): } } @@ -1273,10 +1273,10 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // Old subconn from child-1 should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + // Old subconn from child-1 should be shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } if err := cc.WaitForErrPicker(ctx); err != nil { @@ -1363,8 +1363,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { select { case sc := <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn: %s", sc) - case sc := <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn: %v", sc) + case sc := <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn: %v", sc) case <-time.After(time.Millisecond * 100): } @@ -1395,8 +1395,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { select { case sc := <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn: %s", sc) - case sc := <-cc.RemoveSubConnCh: - t.Fatalf("got unexpected remove SubConn: %v", sc) + case sc := <-cc.ShutdownSubConnCh: + t.Fatalf("got unexpected shutdown SubConn: %v", sc) case <-time.After(time.Millisecond * 100): } } @@ -1463,10 +1463,10 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // Old subconn should be removed. - scToRemove := <-cc.RemoveSubConnCh - if scToRemove != sc1 { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + // Old subconn should be shut down. + scToShutdown := <-cc.ShutdownSubConnCh + if scToShutdown != sc1 { + t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } // A new subconn should be created. diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 005efd1c581c..ce25141643b1 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -252,7 +252,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { if _, ok := addrsSet.Get(addr); !ok { v, _ := b.subConns.Get(addr) scInfo := v.(*subConn) - b.cc.RemoveSubConn(scInfo.sc) + scInfo.sc.Shutdown() b.subConns.Delete(addr) addrsUpdated = true // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. @@ -354,8 +354,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance // Save error to be reported via picker. b.connErr = state.ConnectionError case connectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scStates. Remove state for this sc here. + // When an address was removed by resolver, b called Shutdown but kept + // the sc's state in scStates. Remove state for this sc here. delete(b.scStates, sc) }