Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn #6493

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ type SubConn interface {
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
// allowed to complete. No future calls should be made on the SubConn.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No future calls should be made on the SubConn

Do we handle calls on a closed subConn gracefully?
From what I see UpdateAddresses and Connect become no-ops if the state is SHUTDOWN. It would have been nicer if these methods returned an error though. At least Connect() could have returned an error, as the underlying addrConn.connect() already returns an error if the addrConn is shutdown, but we drop that error in our acBalancerWrapper.

Would it be better to say that future calls will be no-ops instead of saying future calls shouldn't be made?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t would have been nicer if these methods returned an error though.

UpdateAddresses is going away, but note that adding error return values to things typically makes the calling code more complex and the API harder to reason about.

Would it be better to say that future calls will be no-ops instead of saying future calls shouldn't be made?

I'm of the opinion that telling people not to do things is better than stating what will happen if they do things that are not sensible. This is more straightforward and also gives us flexibility in how we decide to behave.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that adding error return values to things typically makes the calling code more complex and the API harder to reason about

It feels completely logical for an operation like Connect to return something to its caller. But I do agree that in this case though, the caller could (and does) rely on subsequent state updates to figure out how things went. But for UpdateAddresses, there is no clean way to let the caller know that the update did not happen. Good that it is going away :)

// One final state update will be delivered to the StateListener (or
// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
// indicate the shutdown operation. This may be delivered before
// in-progress RPCs are complete and the actual connection is closed.
Shutdown()
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -161,6 +168,8 @@ type ClientConn interface {
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
//
// Deprecated: use SubConn.Shutdown instead.
RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
Expand Down
2 changes: 2 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) Shutdown() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}
Expand Down
13 changes: 12 additions & 1 deletion balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,13 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
return connectivity.TransientFailure
}

// UpdateSubConnState is unused; NewSubConn's options always specifies
// updateSubConnState as the listener.
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
}

func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
Expand Down Expand Up @@ -373,8 +379,13 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}
var cc balancer.ClientConn = lb.cc
if lb.usePickFirst {
// Bypass the caching layer that would wrap the picker.
cc = lb.cc.ClientConn
}

lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
}

// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
Expand Down
13 changes: 8 additions & 5 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
if oldUsePickFirst {
// If old SubConn were created for pickfirst, bypass cache and
// remove directly.
lb.cc.cc.RemoveSubConn(sc)
lb.cc.ClientConn.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
Expand All @@ -144,16 +144,17 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.cc.RemoveSubConn(sc)
lb.cc.ClientConn.RemoveSubConn(sc)
delete(lb.subConns, scKey)
return
}
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
lb.cc.ClientConn.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
}
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
return
Expand All @@ -176,6 +177,8 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback

if _, ok := lb.subConns[addrWithoutAttrs]; !ok {
// Use addrWithMD to create the SubConn.
var sc balancer.SubConn
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
Expand Down Expand Up @@ -419,7 +422,7 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
}
}
// Trigger a re-resolve when the stream errors.
ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOptions{})
ccw.lb.cc.ClientConn.ResolveNow(resolver.ResolveNowOptions{})

ccw.lb.mu.Lock()
ccw.lb.remoteBalancerConnected = false
Expand Down
42 changes: 34 additions & 8 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
//
// Its new and remove methods are updated to do cache first.
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
cc balancer.ClientConn
balancer.ClientConn

timeout time.Duration

mu sync.Mutex
Expand All @@ -113,7 +114,7 @@ type subConnCacheEntry struct {

func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
return &lbCacheClientConn{
cc: cc,
ClientConn: cc,
timeout: subConnCacheTime,
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
subConnToAddr: make(map[balancer.SubConn]resolver.Address),
Expand All @@ -137,16 +138,27 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
return entry.sc, nil
}

scNew, err := ccc.cc.NewSubConn(addrs, opts)
scNew, err := ccc.ClientConn.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc}

ccc.subConnToAddr[scNew] = addrWithoutAttrs
return scNew, nil
}

func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
}

type lbCacheSubConn struct {
balancer.SubConn
ccc *lbCacheClientConn
}

func (sc *lbCacheSubConn) Shutdown() {
ccc := sc.ccc
ccc.mu.Lock()
defer ccc.mu.Unlock()
addr, ok := ccc.subConnToAddr[sc]
Expand All @@ -160,7 +172,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
// same address, and those SubConns are all removed. We remove sc
// immediately here.
delete(ccc.subConnToAddr, sc)
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
}
return
}
Expand All @@ -176,7 +188,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
if entry.abortDeleting {
return
}
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
delete(ccc.subConnToAddr, sc)
delete(ccc.subConnCache, addr)
})
Expand All @@ -195,14 +207,28 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
}

func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
ccc.cc.UpdateState(s)
s.Picker = &lbCachePicker{Picker: s.Picker}
ccc.ClientConn.UpdateState(s)
}

func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
ccc.mu.Unlock()
}

type lbCachePicker struct {
balancer.Picker
}

func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) {
res, err := cp.Picker.Pick(i)
if err != nil {
return res, err
}
res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn
return res, nil
}
13 changes: 9 additions & 4 deletions balancer/grpclb/grpclb_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import (

type mockSubConn struct {
balancer.SubConn
mcc *mockClientConn
}

func (msc *mockSubConn) Shutdown() {
msc.mcc.mu.Lock()
defer msc.mcc.mu.Unlock()
delete(msc.mcc.subConns, msc)
}

type mockClientConn struct {
Expand All @@ -46,17 +53,15 @@ func newMockClientConn() *mockClientConn {
}

func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &mockSubConn{}
sc := &mockSubConn{mcc: mcc}
mcc.mu.Lock()
defer mcc.mu.Unlock()
mcc.subConns[sc] = addrs[0]
return sc, nil
}

func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
delete(mcc.subConns, sc)
sc.Shutdown()
}

const testCacheTimeout = 100 * time.Millisecond
Expand Down
8 changes: 7 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{
ccb: ccb,
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
Expand Down Expand Up @@ -372,7 +373,8 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
ac *addrConn // read-only
ac *addrConn // read-only
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)

mu sync.Mutex
Expand All @@ -391,6 +393,10 @@ func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
Expand Down
8 changes: 1 addition & 7 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,7 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}

func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) {
bw.gsb.mu.Unlock()
return
}
bw.gsb.mu.Unlock()
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
}

func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down
18 changes: 13 additions & 5 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type testingLogger interface {

// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
tcc *TestClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
Expand Down Expand Up @@ -66,6 +67,16 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}

// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent
// TestClientConn.
func (tsc *TestSubConn) Shutdown() {
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
easwars marked this conversation as resolved.
Show resolved Hide resolved
select {
case tsc.tcc.RemoveSubConnCh <- tsc:
default:
}
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down Expand Up @@ -106,6 +117,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
Expand All @@ -127,11 +139,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon

// RemoveSubConn removes the SubConn.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
select {
case tcc.RemoveSubConnCh <- sc.(*TestSubConn):
default:
}
sc.(*TestSubConn).Shutdown()
}

// UpdateAddresses updates the addresses on the SubConn.
Expand Down
56 changes: 56 additions & 0 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,3 +1061,59 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) {
t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled)
}
}

// TestSubConnShutdown confirms that the Shutdown method on subconns properly
// initiates their shutdown.
func (s) TestSubConnShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

gotShutdown := grpcsync.NewEvent()

bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
switch scs.ConnectivityState {
case connectivity.Connecting:
// Ignored.
case connectivity.Ready:
sc.Shutdown()
case connectivity.Shutdown:
gotShutdown.Fire()
default:
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
}
},
}
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
if err != nil {
return err
}
sc.Connect()
// Report the state as READY to unblock ss.Start(), which waits for ready.
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
return nil
},
}

const testBalName = "shutdown-test-balancer"
stub.Register(testBalName, bf)
t.Logf("Registered balancer %s...", testBalName)

ss := &stubserver.StubServer{}
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

select {
case <-gotShutdown.Done():
// Success
case <-ctx.Done():
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
}
}
Loading