Skip to content

Commit

Permalink
all: replace RemoveSubConn with Shutdown as much as possible (#6505)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 4, 2023
1 parent 28ac6ef commit 6c0c69e
Show file tree
Hide file tree
Showing 23 changed files with 296 additions and 289 deletions.
5 changes: 3 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,9 @@ type Balancer interface {
// Deprecated: Use NewSubConnOptions.StateListener when creating the
// SubConn instead.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
// Close closes the balancer. The balancer is not currently required to
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
}

Expand Down
8 changes: 4 additions & 4 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -204,8 +204,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
Expand All @@ -226,7 +226,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}

// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}

Expand Down
8 changes: 4 additions & 4 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type lbBalancer struct {
backendAddrsWithoutMetadata []resolver.Address
// Roundrobin functionalities.
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
Expand Down Expand Up @@ -290,7 +290,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
// those in cache (SubConns are cached for 10 seconds after shutdown).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
Expand Down Expand Up @@ -345,8 +345,8 @@ func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubCo
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
case connectivity.TransientFailure:
lb.connErr = scs.ConnectionError
Expand Down
13 changes: 3 additions & 10 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}

balancingPolicyChanged := lb.usePickFirst != pickFirst
oldUsePickFirst := lb.usePickFirst
lb.usePickFirst = pickFirst

if fallbackModeChanged || balancingPolicyChanged {
Expand All @@ -123,13 +122,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// For fallback mode switching with pickfirst, we want to recreate the
// SubConn because the creds could be different.
for a, sc := range lb.subConns {
if oldUsePickFirst {
// If old SubConn were created for pickfirst, bypass cache and
// remove directly.
lb.cc.ClientConn.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
sc.Shutdown()
delete(lb.subConns, a)
}
}
Expand All @@ -144,7 +137,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.ClientConn.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, scKey)
return
}
Expand Down Expand Up @@ -197,7 +190,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
lb.cc.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down
12 changes: 6 additions & 6 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *lbManualResolver) UpdateState(s resolver.State) {
const subConnCacheTime = time.Second * 10

// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
// SubConns will be kept in cache for subConnCacheTime before being shut down.
//
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}

type lbCacheSubConn struct {
Expand All @@ -168,9 +168,9 @@ func (sc *lbCacheSubConn) Shutdown() {

if entry, ok := ccc.subConnCache[addr]; ok {
if entry.sc != sc {
// This could happen if NewSubConn was called multiple times for the
// same address, and those SubConns are all removed. We remove sc
// immediately here.
// This could happen if NewSubConn was called multiple times for
// the same address, and those SubConns are all shut down. We
// remove sc immediately here.
delete(ccc.subConnToAddr, sc)
sc.SubConn.Shutdown()
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
// Only cancel all existing timers. There's no need to shut down SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
Expand Down
26 changes: 13 additions & 13 deletions balancer/grpclb/grpclb_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}

func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc))
}

const testCacheTimeout = 100 * time.Millisecond
Expand All @@ -87,7 +87,7 @@ func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
return nil
}

// Test that SubConn won't be immediately removed.
// Test that SubConn won't be immediately shut down.
func (s) TestLBCacheClientConnExpire(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
Expand All @@ -110,7 +110,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
t.Fatal(err)
}

ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
}
}

// Test that NewSubConn with the same address of a SubConn being removed will
// Test that NewSubConn with the same address of a SubConn being shut down will
// reuse the SubConn and cancel the removing.
func (s) TestLBCacheClientConnReuse(t *testing.T) {
mcc := newMockClientConn()
Expand All @@ -162,7 +162,7 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}

ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -195,8 +195,8 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}

// Call remove again, will delete after timeout.
ccc.RemoveSubConn(sc)
// Call Shutdown again, will delete after timeout.
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
Expand All @@ -223,9 +223,9 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
}
}

// Test that if the timer to remove a SubConn fires at the same time NewSubConn
// cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {
// Test that if the timer to shut down a SubConn fires at the same time
// NewSubConn cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
Expand All @@ -251,9 +251,9 @@ func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {

go func() {
for i := 0; i < 1000; i++ {
// Remove starts a timer with 1 ns timeout, the NewSubConn will race
// with with the timer.
ccc.RemoveSubConn(sc)
// Shutdown starts a timer with 1 ns timeout, the NewSubConn will
// race with with the timer.
sc.Shutdown()
sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
}
close(done)
Expand Down
2 changes: 1 addition & 1 deletion balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
// addr was removed by resolver. Remove.
wsci, _ := b.subConns.Get(addr)
wsc := wsci.(*weightedSubConn)
b.cc.RemoveSubConn(wsc.SubConn)
wsc.SubConn.Shutdown()
b.subConns.Delete(addr)
}
}
Expand Down
52 changes: 26 additions & 26 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func (s) TestWeightedTarget(t *testing.T) {
// attribute set to the config that was passed to it.
verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))

// The subconn for cluster_1 should be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn for cluster_1 should be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

sc2 := <-cc.NewSubConnCh
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand Down Expand Up @@ -286,12 +286,12 @@ func (s) TestWeightedTarget(t *testing.T) {
}
verifyAddressInNewSubConn(t, cc, addr3)

// The subconn from the test_config_balancer should be removed.
scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn from the test_config_balancer should be shut down.
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

// Send subconn state change.
sc3 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -409,12 +409,12 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Expect one SubConn to be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// Expect one SubConn to be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh

// Test pick with only the second SubConn.
Expand Down Expand Up @@ -579,7 +579,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
t.Fatalf("want %v, got %v", want, err)
}

// Remove subConn corresponding to addr3.
// Shut down subConn corresponding to addr3.
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
Expand All @@ -590,11 +590,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc3 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc3 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
Expand Down Expand Up @@ -823,9 +823,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// picker which ensures that the removed subBalancer is not picked for RPCs.
p = <-cc.NewPickerCh

scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown)
}
want = []balancer.SubConn{sc1, sc3}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
Expand Down Expand Up @@ -865,9 +865,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// Removing a subBalancer causes the weighted target LB policy to push a new
// picker which ensures that the removed subBalancer is not picked for RPCs.

scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
34 changes: 16 additions & 18 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,8 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
// The graceful switch balancer will never call this.
logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down Expand Up @@ -392,7 +377,20 @@ func (acbw *acBalancerWrapper) Connect() {
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
ccb := acbw.ccb
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
Expand Down
Loading

0 comments on commit 6c0c69e

Please sign in to comment.