From aaee0c9eec2f64f278744935d1563d4e0ef72196 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 21:51:21 -0800 Subject: [PATCH 01/21] client: rework resolver and balancer wrappers to avoid deadlock --- balancer_wrapper.go | 248 ++++++++++++------------------------------ clientconn.go | 70 ++++-------- internal/idle/idle.go | 6 +- resolver_wrapper.go | 180 +++++++++++++----------------- 4 files changed, 168 insertions(+), 336 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index da78051f578a..88d0fdafe140 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -32,15 +32,6 @@ import ( "google.golang.org/grpc/resolver" ) -type ccbMode int - -const ( - ccbModeActive = iota - ccbModeIdle - ccbModeClosed - ccbModeExitingIdle -) - // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the @@ -57,84 +48,85 @@ const ( type ccBalancerWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. - cc *ClientConn - opts balancer.BuildOptions - - // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled in the serializer. Fields - // accessed *only* in these serializer callbacks, can therefore be accessed - // without a mutex. - balancer *gracefulswitch.Balancer - curBalancerName string + cc *ClientConn + opts balancer.BuildOptions + balancer *gracefulswitch.Balancer - // mu guards access to the below fields. Access to the serializer and its - // cancel function needs to be mutex protected because they are overwritten - // when the wrapper exits idle mode. - mu sync.Mutex - serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. - serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. - mode ccbMode // Tracks the current mode of the wrapper. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + // The following fields are only accessed within the serializer. + curBalancerName string } // newCCBalancerWrapper creates a new balancer wrapper in idle state. The // underlying balancer is not created until the switchTo() method is invoked. -func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { +func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { + ctx, cancel := context.WithCancel(cc.ctx) ccb := &ccBalancerWrapper{ - cc: cc, - opts: bopts, - mode: ccbModeIdle, + cc: cc, + opts: balancer.BuildOptions{ + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }, + serializer: grpcsync.NewCallbackSerializer(ctx), + serializerCancel: cancel, } + ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) return ccb } // updateClientConnState is invoked by grpc to push a ClientConnState update to -// the underlying balancer. +// the underlying balancer. This is always executed from the serializer, so +// it is safe to call into the balancer here. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - ccb.mu.Lock() - errCh := make(chan error, 1) - // Here and everywhere else where Schedule() is called, it is done with the - // lock held. But the lock guards only the scheduling part. The actual - // callback is called asynchronously without the lock being held. - ok := ccb.serializer.Schedule(func(_ context.Context) { - errCh <- ccb.balancer.UpdateClientConnState(*ccs) + errCh := make(chan error) + ok := ccb.serializer.Schedule(func(ctx context.Context) { + defer close(errCh) + if ctx.Err() != nil { + return + } + err := ccb.balancer.UpdateClientConnState(*ccs) + if logger.V(2) && err != nil { + logger.Infof("error from balancer.UpdateClientConnState: %v", err) + } + errCh <- err }) if !ok { - // If we are unable to schedule a function with the serializer, it - // indicates that it has been closed. A serializer is only closed when - // the wrapper is closed or is in idle. - ccb.mu.Unlock() - return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") - } - ccb.mu.Unlock() - - // We get here only if the above call to Schedule succeeds, in which case it - // is guaranteed that the scheduled function will run. Therefore it is safe - // to block on this channel. - err := <-errCh - if logger.V(2) && err != nil { - logger.Infof("error from balancer.UpdateClientConnState: %v", err) + return nil } - return err + return <-errCh } // updateSubConnState is invoked by grpc to push a subConn state update to the // underlying balancer. func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } // Even though it is optional for balancers, gracefulswitch ensures // opts.StateListener is set, so this cannot ever be nil. + // TODO: delete this comment when UpdateSubConnState is removed. sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) - ccb.mu.Unlock() } +// resolverError is invoked by grpc to push a resolver error to the underlying +// balancer. This is always executed from the serializer, so it is safe to call +// into the balancer here. func (ccb *ccBalancerWrapper) resolverError(err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } ccb.balancer.ResolverError(err) }) - ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -148,8 +140,10 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { @@ -157,7 +151,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { } ccb.buildLoadBalancingPolicy(name) }) - ccb.mu.Unlock() } // buildLoadBalancingPolicy performs the following: @@ -186,109 +179,27 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { func (ccb *ccBalancerWrapper) close() { channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") - ccb.closeBalancer(ccbModeClosed) -} - -// enterIdleMode is invoked by grpc when the channel enters idle mode upon -// expiry of idle_timeout. This call blocks until the balancer is closed. -func (ccb *ccBalancerWrapper) enterIdleMode() { - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") - ccb.closeBalancer(ccbModeIdle) -} - -// closeBalancer is invoked when the channel is being closed or when it enters -// idle mode upon expiry of idle_timeout. -func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { - ccb.mu.Lock() - if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { - ccb.mu.Unlock() - return - } - - ccb.mode = m - done := ccb.serializer.Done() - b := ccb.balancer - ok := ccb.serializer.Schedule(func(_ context.Context) { - // Close the serializer to ensure that no more calls from gRPC are sent - // to the balancer. - ccb.serializerCancel() - // Empty the current balancer name because we don't have a balancer - // anymore and also so that we act on the next call to switchTo by - // creating a new balancer specified by the new resolver. - ccb.curBalancerName = "" - }) - if !ok { - ccb.mu.Unlock() - return - } - ccb.mu.Unlock() - - // Give enqueued callbacks a chance to finish before closing the balancer. - <-done - b.Close() -} - -// exitIdleMode is invoked by grpc when the channel exits idle mode either -// because of an RPC or because of an invocation of the Connect() API. This -// recreates the balancer that was closed previously when entering idle mode. -// -// If the channel is not in idle mode, we know for a fact that we are here as a -// result of the user calling the Connect() method on the ClientConn. In this -// case, we can simply forward the call to the underlying balancer, instructing -// it to reconnect to the backends. -func (ccb *ccBalancerWrapper) exitIdleMode() { - ccb.mu.Lock() - if ccb.mode == ccbModeClosed { - // Request to exit idle is a no-op when wrapper is already closed. - ccb.mu.Unlock() - return - } - - if ccb.mode == ccbModeIdle { - // Recreate the serializer which was closed when we entered idle. - ctx, cancel := context.WithCancel(context.Background()) - ccb.serializer = grpcsync.NewCallbackSerializer(ctx) - ccb.serializerCancel = cancel - } - - // The ClientConn guarantees that mutual exclusion between close() and - // exitIdleMode(), and since we just created a new serializer, we can be - // sure that the below function will be scheduled. - done := make(chan struct{}) ccb.serializer.Schedule(func(context.Context) { - defer close(done) - - ccb.mu.Lock() - defer ccb.mu.Unlock() - - if ccb.mode != ccbModeIdle { - ccb.balancer.ExitIdle() + if ccb.balancer == nil { return } - - // Gracefulswitch balancer does not support a switchTo operation after - // being closed. Hence we need to create a new one here. - ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) - ccb.mode = ccbModeActive - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") - + ccb.balancer.Close() + ccb.balancer = nil }) - ccb.mu.Unlock() - - <-done + ccb.serializerCancel() } -func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { - ccb.mu.Lock() - defer ccb.mu.Unlock() - return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed +// exitIdle invokes the balancer's exitIdle method in the scheduler. +func (ccb *ccBalancerWrapper) exitIdle() { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } + ccb.balancer.ExitIdle() + }) } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - if ccb.isIdleOrClosed() { - return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") - } - if len(addrs) == 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } @@ -313,10 +224,6 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { - if ccb.isIdleOrClosed() { - return - } - acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -325,10 +232,6 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { - if ccb.isIdleOrClosed() { - return - } - // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is @@ -339,10 +242,6 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { - if ccb.isIdleOrClosed() { - return - } - ccb.cc.resolveNow(o) } @@ -374,20 +273,7 @@ func (acbw *acBalancerWrapper) Connect() { } func (acbw *acBalancerWrapper) Shutdown() { - ccb := acbw.ccb - if ccb.isIdleOrClosed() { - // It it safe to ignore this call when the balancer is closed or in idle - // because the ClientConn takes care of closing the connections. - // - // Not returning early from here when the balancer is closed or in idle - // leads to a deadlock though, because of the following sequence of - // calls when holding cc.mu: - // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> - // ccb.RemoveAddrConn --> cc.removeAddrConn - return - } - - ccb.cc.removeAddrConn(acbw.ac, errConnDrain) + acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain) } // NewStream begins a streaming RPC on the addrConn. If the addrConn is not diff --git a/clientconn.go b/clientconn.go index 75c432928eb2..5f44ab0c48ca 100644 --- a/clientconn.go +++ b/clientconn.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -131,7 +130,6 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.retryThrottler.Store((*retryThrottler)(nil)) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) - cc.exitIdleCond = sync.NewCond(&cc.mu) // Apply dial options. disableGlobalOpts := false @@ -188,15 +186,6 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: cc.dopts.copts.TransportCredentials, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) // Configure idleness support with configured idle timeout or default idle // timeout duration. Idleness can be explicitly disabled by the user, by @@ -355,21 +344,20 @@ func (cc *ClientConn) exitIdleMode() error { // We achieve this synchronization using the below condition variable. cc.mu.Lock() cc.idlenessState = ccIdlenessStateActive - cc.exitIdleCond.Signal() cc.mu.Unlock() }() cc.idlenessState = ccIdlenessStateExitingIdle cc.pickerWrapper.exitIdleMode() - cc.balancerWrapper.exitIdleMode() + cc.balancerWrapper = newCCBalancerWrapper(cc) cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver // which might update state or report error inline which needs to be handled // by cc.updateResolverState() which also grabs cc.mu. - if err := cc.initResolverWrapper(cc.dopts.copts.TransportCredentials); err != nil { + if err := cc.initResolverWrapper(); err != nil { return err } @@ -381,12 +369,13 @@ func (cc *ClientConn) exitIdleMode() error { // name resolver, load balancer, and any subchannels. func (cc *ClientConn) enterIdleMode() error { cc.mu.Lock() - defer cc.mu.Unlock() if cc.conns == nil { + cc.mu.Unlock() return ErrClientConnClosing } if cc.idlenessState != ccIdlenessStateActive { + cc.mu.Unlock() channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) return nil } @@ -397,15 +386,11 @@ func (cc *ClientConn) enterIdleMode() error { conns := cc.conns cc.conns = make(map[*addrConn]struct{}) - // TODO: Currently, we close the resolver wrapper upon entering idle mode - // and create a new one upon exiting idle mode. This means that the - // `cc.resolverWrapper` field would be overwritten everytime we exit idle - // mode. While this means that we need to hold `cc.mu` when accessing - // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should - // try to do the same for the balancer and picker wrappers too. + rWrapper := cc.resolverWrapper + bWrapper := cc.balancerWrapper cc.resolverWrapper.close() cc.pickerWrapper.enterIdleMode() - cc.balancerWrapper.enterIdleMode() + cc.balancerWrapper.close() cc.csMgr.updateState(connectivity.Idle) cc.idlenessState = ccIdlenessStateIdle cc.addTraceEvent("entering idle mode") @@ -416,6 +401,9 @@ func (cc *ClientConn) enterIdleMode() error { } }() + cc.mu.Unlock() + <-rWrapper.serializer.Done() + <-bWrapper.serializer.Done() return nil } @@ -649,7 +637,6 @@ type ClientConn struct { conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. idlenessState ccIdlenessState // Tracks idleness state of the channel. - exitIdleCond *sync.Cond // Signalled when channel exits idle. lceMu sync.Mutex // protects lastConnectionError lastConnectionError error @@ -728,7 +715,7 @@ func (cc *ClientConn) Connect() { cc.exitIdleMode() // If the ClientConn was not in idle mode, we need to call ExitIdle on the // LB policy so that connections can be created. - cc.balancerWrapper.exitIdleMode() + cc.balancerWrapper.exitIdle() } // waitForResolvedAddrs blocks until the resolver has provided addresses or the @@ -1211,16 +1198,16 @@ func (cc *ClientConn) Close() error { <-cc.csMgr.pubSub.Done() }() + // Prevent calls to enter/exit idle immediately, and ensure we are not + // currently entering/exiting idle mode. + cc.idlenessMgr.Close() + cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return ErrClientConnClosing } - for cc.idlenessState == ccIdlenessStateExitingIdle { - cc.exitIdleCond.Wait() - } - conns := cc.conns cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) @@ -1229,15 +1216,16 @@ func (cc *ClientConn) Close() error { // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() + if cc.resolverWrapper != nil { + cc.resolverWrapper.close() + <-cc.resolverWrapper.serializer.Done() + } // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. cc.pickerWrapper.close() - cc.balancerWrapper.close() - if rWrapper := cc.resolverWrapper; rWrapper != nil { - rWrapper.close() - } - if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil { - idlenessMgr.Close() + if cc.balancerWrapper != nil { + cc.balancerWrapper.close() + <-cc.balancerWrapper.serializer.Done() } for ac := range conns { @@ -1963,18 +1951,8 @@ func (cc *ClientConn) determineAuthority() error { // initResolverWrapper creates a ccResolverWrapper, which builds the name // resolver. This method grabs the lock to assign the newly built resolver // wrapper to the cc.resolverWrapper field. -func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { - rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ - target: cc.parsedTarget, - builder: cc.resolverBuilder, - bOpts: resolver.BuildOptions{ - DisableServiceConfig: cc.dopts.disableServiceConfig, - DialCreds: creds, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - }, - channelzID: cc.channelzID, - }) +func (cc *ClientConn) initResolverWrapper() error { + rw, err := newCCResolverWrapper(cc) if err != nil { return fmt.Errorf("failed to build resolver: %v", err) } diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 6c272476e5ef..d9ce84f8d93b 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -295,7 +295,9 @@ func (m *manager) Close() { atomic.StoreInt32(&m.closed, 1) m.idleMu.Lock() - m.timer.Stop() - m.timer = nil + if m.timer != nil { + m.timer.Stop() + m.timer = nil + } m.idleMu.Unlock() } diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 2a564190e040..7a2de41a4199 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -23,7 +23,6 @@ import ( "strings" "sync" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -31,119 +30,89 @@ import ( "google.golang.org/grpc/serviceconfig" ) -// resolverStateUpdater wraps the single method used by ccResolverWrapper to -// report a state update from the actual resolver implementation. -type resolverStateUpdater interface { - updateResolverState(s resolver.State, err error) error -} - // ccResolverWrapper is a wrapper on top of cc for resolvers. // It implements resolver.ClientConn interface. type ccResolverWrapper struct { - // The following fields are initialized when the wrapper is created and are - // read-only afterwards, and therefore can be accessed without a mutex. - cc resolverStateUpdater - channelzID *channelz.Identifier + // The following fields are read-only. + cc *ClientConn ignoreServiceConfig bool - opts ccResolverWrapperOpts - serializer *grpcsync.CallbackSerializer // To serialize all incoming calls. - serializerCancel context.CancelFunc // To close the serializer, accessed only from close(). - - // All incoming (resolver --> gRPC) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled on the serializer. - // Fields accessed *only* in these serializer callbacks, can therefore be - // accessed without a mutex. - curState resolver.State + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + // The following fields are only accessed within the serializer. + resolver resolver.Resolver - // mu guards access to the below fields. + // The following fields are protected by mu. mu sync.Mutex + curState resolver.State closed bool - resolver resolver.Resolver // Accessed only from outgoing calls. -} - -// ccResolverWrapperOpts wraps the arguments to be passed when creating a new -// ccResolverWrapper. -type ccResolverWrapperOpts struct { - target resolver.Target // User specified dial target to resolve. - builder resolver.Builder // Resolver builder to use. - bOpts resolver.BuildOptions // Resolver build options to use. - channelzID *channelz.Identifier // Channelz identifier for the channel. } // newCCResolverWrapper uses the resolver.Builder to build a Resolver and // returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) { - ctx, cancel := context.WithCancel(context.Background()) +func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { + ctx, cancel := context.WithCancel(cc.ctx) ccr := &ccResolverWrapper{ cc: cc, - channelzID: opts.channelzID, - ignoreServiceConfig: opts.bOpts.DisableServiceConfig, - opts: opts, + ignoreServiceConfig: cc.dopts.disableServiceConfig, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } - // Cannot hold the lock at build time because the resolver can send an - // update or error inline and these incoming calls grab the lock to schedule - // a callback in the serializer. - r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) - if err != nil { - cancel() + errCh := make(chan error) + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } + opts := resolver.BuildOptions{ + DisableServiceConfig: cc.dopts.disableServiceConfig, + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + } + var err error + ccr.resolver, err = cc.resolverBuilder.Build(cc.parsedTarget, ccr, opts) + errCh <- err + }) + + if err := <-errCh; err != nil { return nil, err } - - // Any error reported by the resolver at build time that leads to a - // re-resolution request from the balancer is dropped by grpc until we - // return from this function. So, we don't have to handle pending resolveNow - // requests here. - ccr.mu.Lock() - ccr.resolver = r - ccr.mu.Unlock() - return ccr, nil } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { - ccr.mu.Lock() - defer ccr.mu.Unlock() - - // ccr.resolver field is set only after the call to Build() returns. But in - // the process of building, the resolver may send an error update which when - // propagated to the balancer may result in a re-resolution request. - if ccr.closed || ccr.resolver == nil { - return - } - ccr.resolver.ResolveNow(o) + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccr.resolver == nil { + return + } + ccr.resolver.ResolveNow(o) + }) } func (ccr *ccResolverWrapper) close() { - ccr.mu.Lock() - if ccr.closed { - ccr.mu.Unlock() - return - } - - channelz.Info(logger, ccr.channelzID, "Closing the name resolver") + channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") - // Close the serializer to ensure that no more calls from the resolver are - // handled, before actually closing the resolver. + ccr.serializer.Schedule(func(context.Context) { + if ccr.resolver == nil { + return + } + ccr.mu.Lock() + ccr.closed = true + ccr.mu.Unlock() + ccr.resolver.Close() + ccr.resolver = nil + }) ccr.serializerCancel() - ccr.closed = true - r := ccr.resolver - ccr.mu.Unlock() - - // Give enqueued callbacks a chance to finish. - <-ccr.serializer.Done() - - // Spawn a goroutine to close the resolver (since it may block trying to - // cleanup all allocated resources) and return early. - go r.Close() } // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { - errCh := make(chan error, 1) + ccr.mu.Lock() + if ccr.closed { + return nil + } if s.Endpoints == nil { s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) for _, a := range s.Addresses { @@ -152,41 +121,38 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { s.Endpoints = append(s.Endpoints, ep) } } - ok := ccr.serializer.Schedule(func(context.Context) { - ccr.addChannelzTraceEvent(s) - ccr.curState = s - if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { - errCh <- balancer.ErrBadResolverState - return - } - errCh <- nil - }) - if !ok { - // The only time when Schedule() fail to add the callback to the - // serializer is when the serializer is closed, and this happens only - // when the resolver wrapper is closed. - return nil - } - return <-errCh + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + return ccr.cc.updateResolverState(s, nil) } // ReportError is called by resolver implementations to report errors // encountered during name resolution to gRPC. func (ccr *ccResolverWrapper) ReportError(err error) { - ccr.serializer.Schedule(func(_ context.Context) { - channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) - ccr.cc.updateResolverState(resolver.State{}, err) - }) + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + return + } + ccr.mu.Unlock() + channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) + ccr.cc.updateResolverState(resolver.State{}, err) } // NewAddress is called by the resolver implementation to send addresses to // gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - ccr.serializer.Schedule(func(_ context.Context) { - ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) - ccr.curState.Addresses = addrs - ccr.cc.updateResolverState(ccr.curState, nil) - }) + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + return + } + s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig} + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + ccr.cc.updateResolverState(s, nil) } // ParseServiceConfig is called by resolver implementations to parse a JSON @@ -215,5 +181,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { updates = append(updates, "resolver returned new addresses") } - channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) + channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) } From f4dc8e99d1a0786228d87b31b1e943ab43d1eae2 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 16 Nov 2023 13:25:07 -0800 Subject: [PATCH 02/21] - Idle manager starts in idle state. - All channel idleness management goes through idle manager -- e.g. Connect(). - Add lock and closed state to balancer_wrapper to prevent a race where the balancer creates a subchannel and we enter idle mode during that process. - Split resolver wrapper into constructor and start method so the serializer is available during resolver.Build. - Add EnterIdleModeForTesting to idleness manager and use idlenessMgr methods in the idleness test instead of methods on the ClientConn which should not be called directly. --- balancer_wrapper.go | 48 +++++++- clientconn.go | 200 ++++++++++++--------------------- internal/idle/idle.go | 115 +++++++++---------- internal/idle/idle_e2e_test.go | 8 +- internal/idle/idle_test.go | 71 +++++++----- internal/internal.go | 2 +- picker_wrapper.go | 19 +--- resolver_balancer_ext_test.go | 71 ++++++++++++ resolver_wrapper.go | 44 ++++---- 9 files changed, 304 insertions(+), 274 deletions(-) create mode 100644 resolver_balancer_ext_test.go diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 88d0fdafe140..64ab94012a61 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -48,15 +48,19 @@ import ( type ccBalancerWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. - cc *ClientConn - opts balancer.BuildOptions - balancer *gracefulswitch.Balancer - + cc *ClientConn + opts balancer.BuildOptions + balancer *gracefulswitch.Balancer serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc // The following fields are only accessed within the serializer. curBalancerName string + + // The following fields are protected by mu. Caller must take cc.mu before + // taking mu. + mu sync.Mutex + closed bool } // newCCBalancerWrapper creates a new balancer wrapper in idle state. The @@ -200,10 +204,20 @@ func (ccb *ccBalancerWrapper) exitIdle() { } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() + return nil, errConnIdling + } + ccb.mu.Unlock() + if len(addrs) == 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } - ac, err := ccb.cc.newAddrConn(addrs, opts) + ac, err := ccb.cc.newAddrConnLocked(addrs, opts) if err != nil { channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err @@ -232,17 +246,39 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() + return + } + ccb.mu.Unlock() // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. + + // Note that there is no need to check if the balancer wrapper was closed, + // as we know the graceful switch LB policy will not call cc if it has been + // closed. ccb.cc.pickerWrapper.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { - ccb.cc.resolveNow(o) + ccb.cc.mu.RLock() + defer ccb.cc.mu.RUnlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() + return + } + ccb.mu.Unlock() + ccb.cc.resolveNowLocked(o) } func (ccb *ccBalancerWrapper) Target() string { diff --git a/clientconn.go b/clientconn.go index 5f44ab0c48ca..d85f66c2b5e7 100644 --- a/clientconn.go +++ b/clientconn.go @@ -120,11 +120,11 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires // newClient returns a new client in idle mode. func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ - target: target, - conns: make(map[*addrConn]struct{}), - dopts: defaultDialOptions(), - czData: new(channelzData), - idlenessState: ccIdlenessStateIdle, + target: target, + conns: make(map[*addrConn]struct{}), + dopts: defaultDialOptions(), + czData: new(channelzData), + idle: true, } cc.retryThrottler.Store((*retryThrottler)(nil)) @@ -190,7 +190,7 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) // Configure idleness support with configured idle timeout or default idle // timeout duration. Idleness can be explicitly disabled by the user, by // setting the dial option to 0. - cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger}) + cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout}) return cc, nil } @@ -234,8 +234,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - // This creates the name resolver, load balancer, blocking picker etc. - if err := cc.exitIdleMode(); err != nil { + // This creates the name resolver, load balancer, etc. + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { return nil, err } @@ -310,8 +310,8 @@ func (cc *ClientConn) addTraceEvent(msg string) { type idler ClientConn -func (i *idler) EnterIdleMode() error { - return (*ClientConn)(i).enterIdleMode() +func (i *idler) EnterIdleMode() { + (*ClientConn)(i).enterIdleMode() } func (i *idler) ExitIdleMode() error { @@ -319,45 +319,25 @@ func (i *idler) ExitIdleMode() error { } // exitIdleMode moves the channel out of idle mode by recreating the name -// resolver and load balancer. -func (cc *ClientConn) exitIdleMode() error { +// resolver and load balancer. This should never be called directly; use +// cc.idlenessMgr.ExitIdleMode instead. +func (cc *ClientConn) exitIdleMode() (err error) { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return errConnClosing } - if cc.idlenessState != ccIdlenessStateIdle { - channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState) - cc.mu.Unlock() - return nil - } - - defer func() { - // When Close() and exitIdleMode() race against each other, one of the - // following two can happen: - // - Close() wins the race and runs first. exitIdleMode() runs after, and - // sees that the ClientConn is already closed and hence returns early. - // - exitIdleMode() wins the race and runs first and recreates the balancer - // and releases the lock before recreating the resolver. If Close() runs - // in this window, it will wait for exitIdleMode to complete. - // - // We achieve this synchronization using the below condition variable. - cc.mu.Lock() - cc.idlenessState = ccIdlenessStateActive - cc.mu.Unlock() - }() - - cc.idlenessState = ccIdlenessStateExitingIdle - cc.pickerWrapper.exitIdleMode() + cc.resolverWrapper = newCCResolverWrapper(cc) cc.balancerWrapper = newCCBalancerWrapper(cc) cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver - // which might update state or report error inline which needs to be handled - // by cc.updateResolverState() which also grabs cc.mu. - if err := cc.initResolverWrapper(); err != nil { + // which might update state or report error inline needs to acquire cc.mu. + if err := cc.resolverWrapper.start(); err != nil { + cc.balancerWrapper.close() + cc.resolverWrapper.close() return err } @@ -366,18 +346,14 @@ func (cc *ClientConn) exitIdleMode() error { } // enterIdleMode puts the channel in idle mode, and as part of it shuts down the -// name resolver, load balancer, and any subchannels. -func (cc *ClientConn) enterIdleMode() error { +// name resolver, load balancer, and any subchannels. This should never be +// called directly; use cc.idlenessMgr.EnterIdleMode instead. +func (cc *ClientConn) enterIdleMode() { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() - return ErrClientConnClosing - } - if cc.idlenessState != ccIdlenessStateActive { - cc.mu.Unlock() - channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) - return nil + return } // cc.conns == nil is a proxy for the ClientConn being closed. So, instead @@ -388,23 +364,23 @@ func (cc *ClientConn) enterIdleMode() error { rWrapper := cc.resolverWrapper bWrapper := cc.balancerWrapper + cc.resolverWrapper.close() cc.pickerWrapper.enterIdleMode() cc.balancerWrapper.close() cc.csMgr.updateState(connectivity.Idle) - cc.idlenessState = ccIdlenessStateIdle cc.addTraceEvent("entering idle mode") - go func() { - for ac := range conns { - ac.tearDown(errConnIdling) - } - }() - cc.mu.Unlock() + + // Block until the name resolver and LB policy are closed. <-rWrapper.serializer.Done() <-bWrapper.serializer.Done() - return nil + + // Close all subchannels after the LB policy is closed. + for ac := range conns { + ac.tearDown(errConnIdling) + } } // validateTransportCredentials performs a series of checks on the configured @@ -615,7 +591,7 @@ type ClientConn struct { channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. - idlenessMgr idle.Manager + idlenessMgr *idle.Manager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -636,43 +612,12 @@ type ClientConn struct { sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. - idlenessState ccIdlenessState // Tracks idleness state of the channel. + idle bool lceMu sync.Mutex // protects lastConnectionError lastConnectionError error } -// ccIdlenessState tracks the idleness state of the channel. -// -// Channels start off in `active` and move to `idle` after a period of -// inactivity. When moving back to `active` upon an incoming RPC, they -// transition through `exiting_idle`. This state is useful for synchronization -// with Close(). -// -// This state tracking is mostly for self-protection. The idlenessManager is -// expected to keep track of the state as well, and is expected not to call into -// the ClientConn unnecessarily. -type ccIdlenessState int8 - -const ( - ccIdlenessStateActive ccIdlenessState = iota - ccIdlenessStateIdle - ccIdlenessStateExitingIdle -) - -func (s ccIdlenessState) String() string { - switch s { - case ccIdlenessStateActive: - return "active" - case ccIdlenessStateIdle: - return "idle" - case ccIdlenessStateExitingIdle: - return "exitingIdle" - default: - return "unknown" - } -} - // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // ctx expires. A true value is returned in former case and false in latter. // @@ -712,10 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.exitIdleMode() + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { + cc.addTraceEvent(fmt.Sprintf("error exiting idle mode: %v", err)) + return + } // If the ClientConn was not in idle mode, we need to call ExitIdle on the // LB policy so that connections can be created. + cc.mu.Lock() cc.balancerWrapper.exitIdle() + cc.mu.Unlock() } // waitForResolvedAddrs blocks until the resolver has provided addresses or the @@ -749,11 +699,11 @@ func init() { internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() { return cc.csMgr.pubSub.Subscribe(s) } - internal.EnterIdleModeForTesting = func(cc *ClientConn) error { - return cc.enterIdleMode() + internal.EnterIdleModeForTesting = func(cc *ClientConn) { + cc.idlenessMgr.EnterIdleModeForTesting() } internal.ExitIdleModeForTesting = func(cc *ClientConn) error { - return cc.exitIdleMode() + return cc.idlenessMgr.ExitIdleMode() } } @@ -769,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { } } -func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { +func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error { defer cc.firstResolveEvent.Fire() - cc.mu.Lock() // Check if the ClientConn is already closed. Some fields (e.g. // balancerWrapper) are set to nil when closing the ClientConn, and could // cause nil pointer panic if we don't have this check. @@ -817,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { if cc.sc == nil { // Apply the failing LB only if we haven't received valid service config // from the name resolver in the past. - cc.applyFailingLB(s.ServiceConfig) + cc.applyFailingLBLocked(s.ServiceConfig) cc.mu.Unlock() return ret } @@ -839,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { return ret } -// applyFailingLB is akin to configuring an LB policy on the channel which +// applyFailingLBLocked is akin to configuring an LB policy on the channel which // always fails RPCs. Here, an actual LB policy is not configured, but an always // erroring picker is configured, which returns errors with information about // what was invalid in the received service config. A config selector with no // service config is configured, and the connectivity state of the channel is // set to TransientFailure. -// -// Caller must hold cc.mu. -func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { +func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { var err error if sc.Err != nil { err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) @@ -860,7 +807,9 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { } func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { + cc.mu.Lock() cc.balancerWrapper.updateSubConnState(sc, s, err) + cc.mu.Unlock() } // Makes a copy of the input addresses slice and clears out the balancer @@ -877,10 +826,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad return out } -// newAddrConn creates an addrConn for addrs and adds it to cc.conns. +// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns. // // Caller needs to make sure len(addrs) > 0. -func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { +func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { + if cc.conns == nil { + return nil, ErrClientConnClosing + } + ac := &addrConn{ state: connectivity.Idle, cc: cc, @@ -892,12 +845,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub stateChan: make(chan struct{}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) - // Track ac in cc. This needs to be done before any getTransport(...) is called. - cc.mu.Lock() - defer cc.mu.Unlock() - if cc.conns == nil { - return nil, ErrClientConnClosing - } var err error ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") @@ -913,6 +860,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub }, }) + // Track ac in cc. This needs to be done before any getTransport(...) is called. cc.conns[ac] = struct{}{} return ac, nil } @@ -1161,12 +1109,21 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { cc.mu.RLock() - r := cc.resolverWrapper - cc.mu.RUnlock() - if r == nil { - return + if cc.resolverWrapper != nil { + cc.resolverWrapper.resolveNow(o) } - go r.resolveNow(o) + // r := cc.resolverWrapper + cc.mu.RUnlock() + /* + if r == nil { + return + } + go r.resolveNow(o) + */ +} + +func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) { + cc.resolverWrapper.resolveNow(o) } // ResetConnectBackoff wakes up all subchannels in transient failure and causes @@ -1789,7 +1746,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error { if err != nil { channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) } else { - channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) + channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget) rb = cc.getResolver(parsedTarget.URL.Scheme) if rb != nil { cc.parsedTarget = parsedTarget @@ -1947,22 +1904,3 @@ func (cc *ClientConn) determineAuthority() error { channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) return nil } - -// initResolverWrapper creates a ccResolverWrapper, which builds the name -// resolver. This method grabs the lock to assign the newly built resolver -// wrapper to the cc.resolverWrapper field. -func (cc *ClientConn) initResolverWrapper() error { - rw, err := newCCResolverWrapper(cc) - if err != nil { - return fmt.Errorf("failed to build resolver: %v", err) - } - // Resolver implementations may report state update or error inline when - // built (or right after), and this is handled in cc.updateResolverState. - // Also, an error from the resolver might lead to a re-resolution request - // from the balancer, which is handled in resolveNow() where - // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. - cc.mu.Lock() - cc.resolverWrapper = rw - cc.mu.Unlock() - return nil -} diff --git a/internal/idle/idle.go b/internal/idle/idle.go index d9ce84f8d93b..94633dbe65af 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -26,8 +26,6 @@ import ( "sync" "sync/atomic" "time" - - "google.golang.org/grpc/grpclog" ) // For overriding in unit tests. @@ -39,27 +37,13 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { // and exit from idle mode. type Enforcer interface { ExitIdleMode() error - EnterIdleMode() error -} - -// Manager defines the functionality required to track RPC activity on a -// channel. -type Manager interface { - OnCallBegin() error - OnCallEnd() - Close() + EnterIdleMode() } -type noopManager struct{} - -func (noopManager) OnCallBegin() error { return nil } -func (noopManager) OnCallEnd() {} -func (noopManager) Close() {} - -// manager implements the Manager interface. It uses atomic operations to +// Manager implements the Manager interface. It uses atomic operations to // synchronize access to shared state and a mutex to guarantee mutual exclusion // in a critical section. -type manager struct { +type Manager struct { // State accessed atomically. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -69,8 +53,7 @@ type manager struct { // Can be accessed without atomics or mutex since these are set at creation // time and read-only after that. enforcer Enforcer // Functionality provided by grpc.ClientConn. - timeout int64 // Idle timeout duration nanos stored as an int64. - logger grpclog.LoggerV2 + timeout time.Duration // idleMu is used to guarantee mutual exclusion in two scenarios: // - Opposing intentions: @@ -93,52 +76,51 @@ type manager struct { type ManagerOptions struct { Enforcer Enforcer Timeout time.Duration - Logger grpclog.LoggerV2 } // NewManager creates a new idleness manager implementation for the -// given idle timeout. -func NewManager(opts ManagerOptions) Manager { - if opts.Timeout == 0 { - return noopManager{} +// given idle timeout. It begins in idle mode. +func NewManager(opts ManagerOptions) *Manager { + return &Manager{ + enforcer: opts.Enforcer, + timeout: opts.Timeout, + actuallyIdle: true, + activeCallsCount: -math.MaxInt32, } - - m := &manager{ - enforcer: opts.Enforcer, - timeout: int64(opts.Timeout), - logger: opts.Logger, - } - m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) - return m } // resetIdleTimer resets the idle timer to the given duration. This method // should only be called from the timer callback. -func (m *manager) resetIdleTimer(d time.Duration) { - m.idleMu.Lock() - defer m.idleMu.Unlock() - - if m.timer == nil { - // Only close sets timer to nil. We are done. +func (m *Manager) resetIdleTimerLocked(d time.Duration) { + if m.isClosed() || m.timeout == 0 { return } // It is safe to ignore the return value from Reset() because this method is // only ever called from the timer callback, which means the timer has // already fired. - m.timer.Reset(d) + if m.timer != nil { + m.timer.Stop() + } + m.timer = timeAfterFunc(d, m.handleIdleTimeout) +} + +func (m *Manager) resetIdleTimer(d time.Duration) { + m.idleMu.Lock() + defer m.idleMu.Unlock() + m.resetIdleTimerLocked(d) } // handleIdleTimeout is the timer callback that is invoked upon expiry of the // configured idle timeout. The channel is considered inactive if there are no // ongoing calls and no RPC activity since the last time the timer fired. -func (m *manager) handleIdleTimeout() { +func (m *Manager) handleIdleTimeout() { if m.isClosed() { return } if atomic.LoadInt32(&m.activeCallsCount) > 0 { - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) return } @@ -148,7 +130,7 @@ func (m *manager) handleIdleTimeout() { // Set the timer to fire after a duration of idle timeout, calculated // from the time the most recent RPC completed. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) - m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano())) + m.resetIdleTimer(time.Duration(time.Now().UnixNano()-atomic.LoadInt64(&m.lastCallEndTime)) + m.timeout) return } @@ -160,7 +142,7 @@ func (m *manager) handleIdleTimeout() { // This CAS operation can fail if an RPC started after we checked for // activity at the top of this method, or one was ongoing from before // the last time we were here. In both case, reset the timer and return. - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) return } @@ -175,7 +157,7 @@ func (m *manager) handleIdleTimeout() { // active, or because of an error from the channel. Undo the attempt to // enter idle, and reset the timer to try again later. atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) } // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -185,7 +167,7 @@ func (m *manager) handleIdleTimeout() { // Return value indicates whether or not the channel moved to idle mode. // // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode() bool { m.idleMu.Lock() defer m.idleMu.Unlock() @@ -203,18 +185,19 @@ func (m *manager) tryEnterIdleMode() bool { // No new RPCs have come in since we last set the active calls count value // -math.MaxInt32 in the timer callback. And since we have the lock, it is // safe to enter idle mode now. - if err := m.enforcer.EnterIdleMode(); err != nil { - m.logger.Errorf("Failed to enter idle mode: %v", err) - return false - } + m.enforcer.EnterIdleMode() // Successfully entered idle mode. m.actuallyIdle = true return true } +func (m *Manager) EnterIdleModeForTesting() { + m.tryEnterIdleMode() +} + // OnCallBegin is invoked at the start of every RPC. -func (m *manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } @@ -227,7 +210,7 @@ func (m *manager) OnCallBegin() error { // Channel is either in idle mode or is in the process of moving to idle // mode. Attempt to exit idle mode to allow this RPC. - if err := m.exitIdleMode(); err != nil { + if err := m.ExitIdleMode(); err != nil { // Undo the increment to calls count, and return an error causing the // RPC to fail. atomic.AddInt32(&m.activeCallsCount, -1) @@ -238,28 +221,30 @@ func (m *manager) OnCallBegin() error { return nil } -// exitIdleMode instructs the channel to exit idle mode. -// -// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (m *manager) exitIdleMode() error { +// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// internal state. +func (m *Manager) ExitIdleMode() error { + // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. m.idleMu.Lock() defer m.idleMu.Unlock() - if !m.actuallyIdle { - // This can happen in two scenarios: + if m.isClosed() || !m.actuallyIdle { + // This can happen in three scenarios: // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called // tryEnterIdleMode(). But before the latter could grab the lock, an RPC // came in and OnCallBegin() noticed that the calls count is negative. // - Channel is in idle mode, and multiple new RPCs come in at the same // time, all of them notice a negative calls count in OnCallBegin and get // here. The first one to get the lock would got the channel to exit idle. + // - Channel is in idle mode, and the user calls Connect which calls + // m.ExitIdleMode. // - // Either way, nothing to do here. + // In any case, there is nothing to do here. return nil } if err := m.enforcer.ExitIdleMode(); err != nil { - return fmt.Errorf("channel failed to exit idle mode: %v", err) + return fmt.Errorf("channel failed to exit idle mode: %w", err) } // Undo the idle entry process. This also respects any new RPC attempts. @@ -267,12 +252,12 @@ func (m *manager) exitIdleMode() error { m.actuallyIdle = false // Start a new timer to fire after the configured idle timeout. - m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout) + m.resetIdleTimerLocked(m.timeout) return nil } // OnCallEnd is invoked at the end of every RPC. -func (m *manager) OnCallEnd() { +func (m *Manager) OnCallEnd() { if m.isClosed() { return } @@ -287,11 +272,11 @@ func (m *manager) OnCallEnd() { atomic.AddInt32(&m.activeCallsCount, -1) } -func (m *manager) isClosed() bool { +func (m *Manager) isClosed() bool { return atomic.LoadInt32(&m.closed) == 1 } -func (m *manager) Close() { +func (m *Manager) Close() { atomic.StoreInt32(&m.closed, 1) m.idleMu.Lock() diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index d2cd9d3e3752..da98c09420dd 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -586,12 +586,8 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { } defer cc.Close() - enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) - enterIdleFunc := func() { - if err := enterIdle(cc); err != nil { - t.Errorf("Failed to enter idle mode: %v", err) - } - } + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + enterIdleFunc := func() { enterIdle(cc) } exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) exitIdleFunc := func() { if err := exitIdle(cc); err != nil { diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index 22bde3ba1422..026d2e4856e7 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpctest" ) @@ -55,10 +54,8 @@ func (ti *testEnforcer) ExitIdleMode() error { } -func (ti *testEnforcer) EnterIdleMode() error { +func (ti *testEnforcer) EnterIdleMode() { ti.enterIdleCh <- struct{}{} - return nil - } func newTestEnforcer() *testEnforcer { @@ -91,7 +88,7 @@ func overrideNewTimer(t *testing.T) <-chan struct{} { // TestManager_Disabled tests the case where the idleness manager is // disabled by passing an idle_timeout of 0. Verifies the following things: // - timer callback does not fire -// - an RPC does not trigger a call to ExitIdleMode on the ClientConn +// - an RPC triggers a call to ExitIdleMode on the ClientConn // - more calls to RPC termination (as compared to RPC initiation) does not // result in an error log func (s) TestManager_Disabled(t *testing.T) { @@ -100,7 +97,7 @@ func (s) TestManager_Disabled(t *testing.T) { // Create an idleness manager that is disabled because of idleTimeout being // set to `0`. enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(0), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(0)}) // Ensure that the timer callback does not fire within a short deadline. select { @@ -109,13 +106,13 @@ func (s) TestManager_Disabled(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - // The first invocation of OnCallBegin() would lead to a call to - // ExitIdleMode() on the enforcer, unless the idleness manager is disabled. - mgr.OnCallBegin() + // The first invocation of OnCallBegin() should lead to a call to + // ExitIdleMode() on the enforcer. + go mgr.OnCallBegin() select { case <-enforcer.exitIdleCh: - t.Fatalf("ExitIdleMode() called on enforcer when manager is disabled") case <-time.After(defaultTestShortTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") } // If the number of calls to OnCallEnd() exceeds the number of calls to @@ -137,8 +134,9 @@ func (s) TestManager_Enabled_TimerFires(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) defer mgr.Close() + mgr.ExitIdleMode() // Ensure that the timer callback fires within a appropriate amount of time. select { @@ -162,8 +160,9 @@ func (s) TestManager_Enabled_OngoingCall(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates an ongoing RPC that is terminated // after the timer callback fires for the first time. @@ -207,8 +206,9 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates unary RPCs until the timer callback // fires. @@ -233,6 +233,7 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { case <-callbackCh: close(timerFired) case <-time.After(2 * defaultTestIdleTimeout): + close(timerFired) t.Fatal("Timeout waiting for idle timer callback to fire") } select { @@ -257,9 +258,11 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) defer mgr.Close() + mgr.ExitIdleMode() + <-enforcer.exitIdleCh // Ensure that the channel moves to idle since there are no RPCs. select { case <-enforcer.enterIdleCh: @@ -297,7 +300,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { type racyState int32 const ( - stateInital racyState = iota + stateInitial racyState = iota stateEnteredIdle stateExitedIdle stateActiveRPCs @@ -306,12 +309,22 @@ const ( // racyIdlnessEnforcer is a test idleness enforcer used specifically to test the // race between idle timeout and incoming RPCs. type racyEnforcer struct { - state *racyState // Accessed atomically. + t *testing.T + state *racyState // Accessed atomically. + started bool } // ExitIdleMode sets the internal state to stateExitedIdle. We should only ever // exit idle when we are currently in idle. func (ri *racyEnforcer) ExitIdleMode() error { + // Set only on the initial ExitIdleMode + if ri.started == false { + if *ri.state != stateInitial { + return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") + } + ri.started = true + return nil + } if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) { return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") } @@ -319,38 +332,36 @@ func (ri *racyEnforcer) ExitIdleMode() error { } // EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start. -func (ri *racyEnforcer) EnterIdleMode() error { - if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) { - return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started") +func (ri *racyEnforcer) EnterIdleMode() { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateEnteredIdle)) { + ri.t.Errorf("idleness enforcer asked to enter idle after rpcs started") } - return nil } -// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where -// firing of the idle timeout races with an incoming RPC. The test verifies that -// if the timer callback win the race and puts the channel in idle, the RPCs can -// kick it out of idle. And if the RPCs win the race and keep the channel -// active, then the timer callback should not attempt to put the channel in idle -// mode. +// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where firing of +// the idle timeout races with an incoming RPC. The test verifies that if the +// timer callback wins the race and puts the channel in idle, the RPCs can kick +// it out of idle. And if the RPCs win the race and keep the channel active, +// then the timer callback should not attempt to put the channel in idle mode. func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { // Run multiple iterations to simulate different possibilities. for i := 0; i < 20; i++ { t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) { var idlenessState racyState - enforcer := &racyEnforcer{state: &idlenessState} + enforcer := &racyEnforcer{t: t, state: &idlenessState} // Configure a large idle timeout so that we can control the // race between the timer callback and RPCs. - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(10 * time.Minute), Logger: grpclog.Component("test")}) + mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(10 * time.Minute)}) defer mgr.Close() + mgr.ExitIdleMode() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - m := mgr.(interface{ handleIdleTimeout() }) <-time.After(defaultTestIdleTimeout / 10) - m.handleIdleTimeout() + mgr.handleIdleTimeout() }() for j := 0; j < 100; j++ { wg.Add(1) diff --git a/internal/internal.go b/internal/internal.go index 2eef978c8d30..450417ea2af8 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -182,7 +182,7 @@ var ( GRPCResolverSchemeExtraMetadata string = "xds" // EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. - EnterIdleModeForTesting any // func(*grpc.ClientConn) error + EnterIdleModeForTesting any // func(*grpc.ClientConn) // ExitIdleModeForTesting gets the ClientConn to exit IDLE mode. ExitIdleModeForTesting any // func(*grpc.ClientConn) error diff --git a/picker_wrapper.go b/picker_wrapper.go index 236837f4157c..48b2042e8c37 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -37,7 +37,6 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool - idle bool blockingCh chan struct{} picker balancer.Picker statsHandlers []stats.Handler // to record blocking picker calls @@ -53,11 +52,7 @@ func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done || pw.idle { - // There is a small window where a picker update from the LB policy can - // race with the channel going to idle mode. If the picker is idle here, - // it is because the channel asked it to do so, and therefore it is sage - // to ignore the update from the LB policy. + if pw.done { pw.mu.Unlock() return } @@ -210,23 +205,15 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } +// enterIdleMode clears the pickerWrapper and prepares it for being used again +// when idle mode is exited. func (pw *pickerWrapper) enterIdleMode() { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.done { - return - } - pw.idle = true -} - -func (pw *pickerWrapper) exitIdleMode() { pw.mu.Lock() defer pw.mu.Unlock() if pw.done { return } pw.blockingCh = make(chan struct{}) - pw.idle = false } // dropError is a wrapper error that indicates the LB policy wishes to drop the diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go new file mode 100644 index 000000000000..5e530c968597 --- /dev/null +++ b/resolver_balancer_ext_test.go @@ -0,0 +1,71 @@ +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +// TestResolverBalancerInteraction tests: +// 1. resolver.Builder.Build() +// 2. resolver.ClientConn.UpdateState() +// 3. balancer.Balancer.UpdateClientConnState() +// 4. balancer.ClientConn.ResolveNow() +// 5. resolver.Resolver.ResolveNow() +func (s) TestResolverBalancerInteraction(t *testing.T) { + const name = "testrbi" + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`) + cc.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: "test"}}, + ServiceConfig: sc, + }) + } + rnCh := make(chan struct{}) + rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + select { + case <-rnCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timed out waiting for resolver.ResolveNow") + } +} diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 7a2de41a4199..4071d1416bff 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -42,7 +42,8 @@ type ccResolverWrapper struct { // The following fields are only accessed within the serializer. resolver resolver.Resolver - // The following fields are protected by mu. + // The following fields are protected by mu. Caller must take cc.mu before + // taking mu. mu sync.Mutex curState resolver.State closed bool @@ -50,35 +51,33 @@ type ccResolverWrapper struct { // newCCResolverWrapper uses the resolver.Builder to build a Resolver and // returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { +func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { ctx, cancel := context.WithCancel(cc.ctx) - ccr := &ccResolverWrapper{ + return &ccResolverWrapper{ cc: cc, ignoreServiceConfig: cc.dopts.disableServiceConfig, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } +} +func (ccr *ccResolverWrapper) start() error { errCh := make(chan error) ccr.serializer.Schedule(func(ctx context.Context) { if ctx.Err() != nil { return } opts := resolver.BuildOptions{ - DisableServiceConfig: cc.dopts.disableServiceConfig, - DialCreds: cc.dopts.copts.TransportCredentials, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, + DisableServiceConfig: ccr.cc.dopts.disableServiceConfig, + DialCreds: ccr.cc.dopts.copts.TransportCredentials, + CredsBundle: ccr.cc.dopts.copts.CredsBundle, + Dialer: ccr.cc.dopts.copts.Dialer, } var err error - ccr.resolver, err = cc.resolverBuilder.Build(cc.parsedTarget, ccr, opts) + ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) errCh <- err }) - - if err := <-errCh; err != nil { - return nil, err - } - return ccr, nil + return <-errCh } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { @@ -92,14 +91,14 @@ func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { func (ccr *ccResolverWrapper) close() { channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") + ccr.mu.Lock() + ccr.closed = true + ccr.mu.Unlock() ccr.serializer.Schedule(func(context.Context) { if ccr.resolver == nil { return } - ccr.mu.Lock() - ccr.closed = true - ccr.mu.Unlock() ccr.resolver.Close() ccr.resolver = nil }) @@ -109,8 +108,11 @@ func (ccr *ccResolverWrapper) close() { // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { + ccr.cc.mu.Lock() ccr.mu.Lock() if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() return nil } if s.Endpoints == nil { @@ -124,35 +126,39 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { ccr.addChannelzTraceEvent(s) ccr.curState = s ccr.mu.Unlock() - return ccr.cc.updateResolverState(s, nil) + return ccr.cc.updateResolverStateAndUnlock(s, nil) } // ReportError is called by resolver implementations to report errors // encountered during name resolution to gRPC. func (ccr *ccResolverWrapper) ReportError(err error) { + ccr.cc.mu.Lock() ccr.mu.Lock() if ccr.closed { ccr.mu.Unlock() + ccr.cc.mu.Unlock() return } ccr.mu.Unlock() channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) - ccr.cc.updateResolverState(resolver.State{}, err) + ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err) } // NewAddress is called by the resolver implementation to send addresses to // gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { + ccr.cc.mu.Lock() ccr.mu.Lock() if ccr.closed { ccr.mu.Unlock() + ccr.cc.mu.Unlock() return } s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig} ccr.addChannelzTraceEvent(s) ccr.curState = s ccr.mu.Unlock() - ccr.cc.updateResolverState(s, nil) + ccr.cc.updateResolverStateAndUnlock(s, nil) } // ParseServiceConfig is called by resolver implementations to parse a JSON From 2cbebb0836f6fb44707859a9b3ae60d877abab1a Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 16 Nov 2023 14:37:59 -0800 Subject: [PATCH 03/21] remove commented code; ensure balancerWrapper and resolverWrapper always exist --- clientconn.go | 52 ++++++++++++++++++----------------------------- picker_wrapper.go | 6 +++--- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/clientconn.go b/clientconn.go index d85f66c2b5e7..c03b638639da 100644 --- a/clientconn.go +++ b/clientconn.go @@ -187,10 +187,8 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) - // Configure idleness support with configured idle timeout or default idle - // timeout duration. Idleness can be explicitly disabled by the user, by - // setting the dial option to 0. cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout}) + cc.initIdleStateLocked() return cc, nil } @@ -327,17 +325,11 @@ func (cc *ClientConn) exitIdleMode() (err error) { cc.mu.Unlock() return errConnClosing } - - cc.resolverWrapper = newCCResolverWrapper(cc) - cc.balancerWrapper = newCCBalancerWrapper(cc) - cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver // which might update state or report error inline needs to acquire cc.mu. if err := cc.resolverWrapper.start(); err != nil { - cc.balancerWrapper.close() - cc.resolverWrapper.close() return err } @@ -345,6 +337,14 @@ func (cc *ClientConn) exitIdleMode() (err error) { return nil } +// initIdleStateLocked initializes common state to how it should be while idle. +func (cc *ClientConn) initIdleStateLocked() { + cc.resolverWrapper = newCCResolverWrapper(cc) + cc.balancerWrapper = newCCBalancerWrapper(cc) + cc.firstResolveEvent = grpcsync.NewEvent() + cc.conns = make(map[*addrConn]struct{}) +} + // enterIdleMode puts the channel in idle mode, and as part of it shuts down the // name resolver, load balancer, and any subchannels. This should never be // called directly; use cc.idlenessMgr.EnterIdleMode instead. @@ -360,17 +360,17 @@ func (cc *ClientConn) enterIdleMode() { // of setting it to nil here, we recreate the map. This also means that we // don't have to do this when exiting idle mode. conns := cc.conns - cc.conns = make(map[*addrConn]struct{}) rWrapper := cc.resolverWrapper + rWrapper.close() + cc.pickerWrapper.reset() bWrapper := cc.balancerWrapper - - cc.resolverWrapper.close() - cc.pickerWrapper.enterIdleMode() - cc.balancerWrapper.close() + bWrapper.close() cc.csMgr.updateState(connectivity.Idle) cc.addTraceEvent("entering idle mode") + cc.initIdleStateLocked() + cc.mu.Unlock() // Block until the name resolver and LB policy are closed. @@ -1109,17 +1109,8 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { cc.mu.RLock() - if cc.resolverWrapper != nil { - cc.resolverWrapper.resolveNow(o) - } - // r := cc.resolverWrapper + cc.resolverWrapper.resolveNow(o) cc.mu.RUnlock() - /* - if r == nil { - return - } - go r.resolveNow(o) - */ } func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) { @@ -1173,17 +1164,14 @@ func (cc *ClientConn) Close() error { // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() - if cc.resolverWrapper != nil { - cc.resolverWrapper.close() - <-cc.resolverWrapper.serializer.Done() - } + cc.resolverWrapper.close() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. cc.pickerWrapper.close() - if cc.balancerWrapper != nil { - cc.balancerWrapper.close() - <-cc.balancerWrapper.serializer.Done() - } + cc.balancerWrapper.close() + + <-cc.resolverWrapper.serializer.Done() + <-cc.balancerWrapper.serializer.Done() for ac := range conns { ac.tearDown(ErrClientConnClosing) diff --git a/picker_wrapper.go b/picker_wrapper.go index 48b2042e8c37..bf56faa76d3d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -205,9 +205,9 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } -// enterIdleMode clears the pickerWrapper and prepares it for being used again -// when idle mode is exited. -func (pw *pickerWrapper) enterIdleMode() { +// reset clears the pickerWrapper and prepares it for being used again when idle +// mode is exited. +func (pw *pickerWrapper) reset() { pw.mu.Lock() defer pw.mu.Unlock() if pw.done { From b81a0a45fbbe8e2ecd2c326a23ecec44f27e5cbb Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 16 Nov 2023 16:42:28 -0800 Subject: [PATCH 04/21] delete idle field fully --- clientconn.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/clientconn.go b/clientconn.go index c03b638639da..42ba25165067 100644 --- a/clientconn.go +++ b/clientconn.go @@ -124,7 +124,6 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), czData: new(channelzData), - idle: true, } cc.retryThrottler.Store((*retryThrottler)(nil)) @@ -612,7 +611,6 @@ type ClientConn struct { sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. - idle bool lceMu sync.Mutex // protects lastConnectionError lastConnectionError error From d3c7779567ea081d7aa012e883eb2b3393411be6 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 16 Nov 2023 16:47:07 -0800 Subject: [PATCH 05/21] fix idle time calculation --- internal/idle/idle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 94633dbe65af..fbee7a125177 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -130,7 +130,7 @@ func (m *Manager) handleIdleTimeout() { // Set the timer to fire after a duration of idle timeout, calculated // from the time the most recent RPC completed. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) - m.resetIdleTimer(time.Duration(time.Now().UnixNano()-atomic.LoadInt64(&m.lastCallEndTime)) + m.timeout) + m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout) return } From 63b2d0dff1dab87ed3662c11c2db4dbfa81636ec Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 28 Nov 2023 10:57:51 -0800 Subject: [PATCH 06/21] add test for resolver build error on second build; fix EnterIdleModeForTesting --- internal/idle/idle.go | 14 ++++++++-- resolver_balancer_ext_test.go | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/internal/idle/idle.go b/internal/idle/idle.go index fbee7a125177..236acf62ce6f 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -193,7 +193,18 @@ func (m *Manager) tryEnterIdleMode() bool { } func (m *Manager) EnterIdleModeForTesting() { - m.tryEnterIdleMode() + if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { + // We have an active RPC and cannot enter idle mode. + return + } + if m.tryEnterIdleMode() { + // Successfully entered idle mode. No further action necessary. + return + } + // Failed to enter idle mode due to a concurrent RPC that kept the channel + // active, or because of an error from the channel. Undo the attempt to + // enter idle, and reset the timer to try again later. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) } // OnCallBegin is invoked at the start of every RPC. @@ -201,7 +212,6 @@ func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } - if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { // Channel is not idle now. Set the activity bit and allow the call. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index 5e530c968597..5d9e5d524df1 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -19,12 +19,16 @@ package grpc_test import ( + "context" + "errors" + "strings" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -69,3 +73,50 @@ func (s) TestResolverBalancerInteraction(t *testing.T) { t.Fatalf("timed out waiting for resolver.ResolveNow") } } + +type resolverBuilderWithErr struct { + resolver.Resolver + errCh <-chan error + scheme string +} + +func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { + if err := <-b.errCh; err != nil { + return nil, err + } + return b, nil +} + +func (b *resolverBuilderWithErr) Scheme() string { + return b.scheme +} + +func (b *resolverBuilderWithErr) Close() {} + +// TestResolverBuildFailure tests: +// 1. resolver.Builder.Build() passes. +// 2. Channel enters idle mode. +// 3. An RPC happens. +// 4. resolver.Builder.Build() fails. +func (s) TestResolverBuildFailure(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + const name = "trbf" + resErrCh := make(chan error, 1) + resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name}) + + resErrCh <- nil + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + enterIdle(cc) + const errStr = "test error from resolver builder" + t.Log("pushing res err") + resErrCh <- errors.New(errStr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) { + t.Fatalf("Invoke = %v; want %v", err, errStr) + } +} From 7ae0904a0f91af8642f5fda6f4f331dfed385750 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 28 Nov 2023 13:26:57 -0800 Subject: [PATCH 07/21] review comments --- balancer_wrapper.go | 23 ++++++++++----------- clientconn.go | 17 +++++++-------- internal/idle/idle.go | 42 +++++++++++++++++++++----------------- internal/idle/idle_test.go | 12 +++++------ resolver_wrapper.go | 6 +++--- 5 files changed, 52 insertions(+), 48 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 64ab94012a61..d6f80b5889e8 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -37,7 +37,8 @@ import ( // ccBalancerWrapper implements methods corresponding to the ones on the // balancer.Balancer interface. The ClientConn is free to call these methods // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn -// to the Balancer happen synchronously and in order. +// to the Balancer happen in order by performing them in the serializer, without +// any mutexes held. // // ccBalancerWrapper also implements the balancer.ClientConn interface and is // passed to the Balancer implementations. It invokes unexported methods on the @@ -54,10 +55,9 @@ type ccBalancerWrapper struct { serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - // The following fields are only accessed within the serializer. - curBalancerName string + curBalancerName string // only accessed within the serializer - // The following fields are protected by mu. Caller must take cc.mu before + // The following field is protected by mu. Caller must take cc.mu before // taking mu. mu sync.Mutex closed bool @@ -92,7 +92,7 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat errCh := make(chan error) ok := ccb.serializer.Schedule(func(ctx context.Context) { defer close(errCh) - if ctx.Err() != nil { + if ctx.Err() != nil || ccb.balancer == nil { return } err := ccb.balancer.UpdateClientConnState(*ccs) @@ -111,7 +111,7 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat // underlying balancer. func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil { + if ctx.Err() != nil || ccb.balancer == nil { return } // Even though it is optional for balancers, gracefulswitch ensures @@ -122,11 +122,10 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti } // resolverError is invoked by grpc to push a resolver error to the underlying -// balancer. This is always executed from the serializer, so it is safe to call -// into the balancer here. +// balancer. The call to the balancer is executed from the serializer. func (ccb *ccBalancerWrapper) resolverError(err error) { ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil { + if ctx.Err() != nil || ccb.balancer == nil { return } ccb.balancer.ResolverError(err) @@ -145,7 +144,7 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil { + if ctx.Err() != nil || ccb.balancer == nil { return } // TODO: Other languages use case-sensitive balancer registries. We should @@ -193,10 +192,10 @@ func (ccb *ccBalancerWrapper) close() { ccb.serializerCancel() } -// exitIdle invokes the balancer's exitIdle method in the scheduler. +// exitIdle invokes the balancer's exitIdle method in the serializer. func (ccb *ccBalancerWrapper) exitIdle() { ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil { + if ctx.Err() != nil || ccb.balancer == nil { return } ccb.balancer.ExitIdle() diff --git a/clientconn.go b/clientconn.go index 42ba25165067..7062b494ff7d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -186,7 +186,7 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) - cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout}) + cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) cc.initIdleStateLocked() return cc, nil @@ -327,7 +327,8 @@ func (cc *ClientConn) exitIdleMode() (err error) { cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver - // which might update state or report error inline needs to acquire cc.mu. + // which might update state or report error inline, which would then need to + // acquire cc.mu. if err := cc.resolverWrapper.start(); err != nil { return err } @@ -589,7 +590,6 @@ type ClientConn struct { dopts dialOptions // Default and user specified dial options. channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). - balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. idlenessMgr *idle.Manager // The following provide their own synchronization, and therefore don't @@ -600,17 +600,18 @@ type ClientConn struct { czData *channelzData retryThrottler atomic.Value // Updated from service config. - // firstResolveEvent is used to track whether the name resolver sent us at - // least one update. RPCs block on this event. - firstResolveEvent *grpcsync.Event - // mu protects the following fields. // TODO: split mu so the same mutex isn't used for everything. mu sync.RWMutex - resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close. + resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close. + balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close. sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. + // firstResolveEvent is used to track whether the name resolver sent us at + // least one update. RPCs block on this event. May be accessed without mu + // if we know we cannot enter idle mode while accessing it. + firstResolveEvent *grpcsync.Event lceMu sync.Mutex // protects lastConnectionError lastConnectionError error diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 236acf62ce6f..a479ef3271cc 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -40,9 +40,8 @@ type Enforcer interface { EnterIdleMode() } -// Manager implements the Manager interface. It uses atomic operations to -// synchronize access to shared state and a mutex to guarantee mutual exclusion -// in a critical section. +// Manager implements idleness detection and calls the configured Enforcer to +// enter/exit idle mode when appropriate. Must be created by NewManager. type Manager struct { // State accessed atomically. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. @@ -71,34 +70,26 @@ type Manager struct { timer *time.Timer } -// ManagerOptions is a collection of options used by -// NewManager. -type ManagerOptions struct { - Enforcer Enforcer - Timeout time.Duration -} - // NewManager creates a new idleness manager implementation for the // given idle timeout. It begins in idle mode. -func NewManager(opts ManagerOptions) *Manager { +func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { return &Manager{ - enforcer: opts.Enforcer, - timeout: opts.Timeout, + enforcer: enforcer, + timeout: timeout, actuallyIdle: true, activeCallsCount: -math.MaxInt32, } } -// resetIdleTimer resets the idle timer to the given duration. This method -// should only be called from the timer callback. +// resetIdleTimerLocked resets the idle timer to the given duration. Called +// when exiting idle mode or when the timer fires and we need to reset it. func (m *Manager) resetIdleTimerLocked(d time.Duration) { - if m.isClosed() || m.timeout == 0 { + if m.isClosed() || m.timeout == 0 || m.actuallyIdle { return } // It is safe to ignore the return value from Reset() because this method is - // only ever called from the timer callback, which means the timer has - // already fired. + // only ever called from the timer callback or when exiting idle mode. if m.timer != nil { m.timer.Stop() } @@ -193,6 +184,14 @@ func (m *Manager) tryEnterIdleMode() bool { } func (m *Manager) EnterIdleModeForTesting() { + if m.timeout == 0 { + m.idleMu.Lock() + defer m.idleMu.Unlock() + if !m.actuallyIdle { + m.enforcer.EnterIdleMode() + } + return + } if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { // We have an active RPC and cannot enter idle mode. return @@ -212,6 +211,11 @@ func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } + if m.timeout == 0 { + // When the manager is disabled (timeout==0), we just exit idle mode if + // needed and return. + return m.ExitIdleMode() + } if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { // Channel is not idle now. Set the activity bit and allow the call. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) @@ -268,7 +272,7 @@ func (m *Manager) ExitIdleMode() error { // OnCallEnd is invoked at the end of every RPC. func (m *Manager) OnCallEnd() { - if m.isClosed() { + if m.timeout == 0 || m.isClosed() { return } diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index 026d2e4856e7..07a27b2615d8 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -97,7 +97,7 @@ func (s) TestManager_Disabled(t *testing.T) { // Create an idleness manager that is disabled because of idleTimeout being // set to `0`. enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(0)}) + mgr := NewManager(enforcer, time.Duration(0)) // Ensure that the timer callback does not fire within a short deadline. select { @@ -134,7 +134,7 @@ func (s) TestManager_Enabled_TimerFires(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() mgr.ExitIdleMode() @@ -160,7 +160,7 @@ func (s) TestManager_Enabled_OngoingCall(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() mgr.ExitIdleMode() @@ -206,7 +206,7 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() mgr.ExitIdleMode() @@ -258,7 +258,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout)}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() mgr.ExitIdleMode() @@ -352,7 +352,7 @@ func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { // Configure a large idle timeout so that we can control the // race between the timer callback and RPCs. - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(10 * time.Minute)}) + mgr := NewManager(enforcer, time.Duration(10*time.Minute)) defer mgr.Close() mgr.ExitIdleMode() diff --git a/resolver_wrapper.go b/resolver_wrapper.go index 4071d1416bff..b31b9906b822 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -33,14 +33,14 @@ import ( // ccResolverWrapper is a wrapper on top of cc for resolvers. // It implements resolver.ClientConn interface. type ccResolverWrapper struct { - // The following fields are read-only. + // The following fields are initialized when the wrapper is created and are + // read-only afterwards, and therefore can be accessed without a mutex. cc *ClientConn ignoreServiceConfig bool serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - // The following fields are only accessed within the serializer. - resolver resolver.Resolver + resolver resolver.Resolver // only accessed within the serializer // The following fields are protected by mu. Caller must take cc.mu before // taking mu. From 932976e37b044277e551ff8341614f1ccb667812 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 29 Nov 2023 16:05:17 -0800 Subject: [PATCH 08/21] review comments --- balancer_wrapper.go | 6 ++++-- clientconn.go | 7 ++++--- internal/idle/idle.go | 2 +- resolver_balancer_ext_test.go | 10 +++++----- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index d6f80b5889e8..8f56b5398750 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -51,11 +51,13 @@ type ccBalancerWrapper struct { // read-only afterwards, and therefore can be accessed without a mutex. cc *ClientConn opts balancer.BuildOptions - balancer *gracefulswitch.Balancer serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - curBalancerName string // only accessed within the serializer + // The following fields are only accessed within the serializer or during + // initialization. + curBalancerName string + balancer *gracefulswitch.Balancer // The following field is protected by mu. Caller must take cc.mu before // taking mu. diff --git a/clientconn.go b/clientconn.go index 7062b494ff7d..65af17091535 100644 --- a/clientconn.go +++ b/clientconn.go @@ -186,9 +186,8 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) + cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) - cc.initIdleStateLocked() - return cc, nil } @@ -610,7 +609,9 @@ type ClientConn struct { mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. // firstResolveEvent is used to track whether the name resolver sent us at // least one update. RPCs block on this event. May be accessed without mu - // if we know we cannot enter idle mode while accessing it. + // if we know we cannot be asked to enter idle mode while accessing it (e.g. + // when the idle manager has already been closed, or if we are already + // entering idle mode). firstResolveEvent *grpcsync.Event lceMu sync.Mutex // protects lastConnectionError diff --git a/internal/idle/idle.go b/internal/idle/idle.go index a479ef3271cc..10d99b1b9581 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -250,7 +250,7 @@ func (m *Manager) ExitIdleMode() error { // - Channel is in idle mode, and multiple new RPCs come in at the same // time, all of them notice a negative calls count in OnCallBegin and get // here. The first one to get the lock would got the channel to exit idle. - // - Channel is in idle mode, and the user calls Connect which calls + // - Channel is not in idle mode, and the user calls Connect which calls // m.ExitIdleMode. // // In any case, there is nothing to do here. diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index 5d9e5d524df1..a56a79756193 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -35,11 +35,11 @@ import ( ) // TestResolverBalancerInteraction tests: -// 1. resolver.Builder.Build() -// 2. resolver.ClientConn.UpdateState() -// 3. balancer.Balancer.UpdateClientConnState() -// 4. balancer.ClientConn.ResolveNow() -// 5. resolver.Resolver.ResolveNow() +// 1. resolver.Builder.Build() -> +// 2. resolver.ClientConn.UpdateState() -> +// 3. balancer.Balancer.UpdateClientConnState() -> +// 4. balancer.ClientConn.ResolveNow() -> +// 5. resolver.Resolver.ResolveNow() -> func (s) TestResolverBalancerInteraction(t *testing.T) { const name = "testrbi" bf := stub.BalancerFuncs{ From 2b90b69b99e4934fad523a77bbef8fb4f516070e Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 29 Nov 2023 16:13:27 -0800 Subject: [PATCH 09/21] comments --- resolver_wrapper.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/resolver_wrapper.go b/resolver_wrapper.go index b31b9906b822..f5c4609d5e85 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -49,8 +49,8 @@ type ccResolverWrapper struct { closed bool } -// newCCResolverWrapper uses the resolver.Builder to build a Resolver and -// returns a ccResolverWrapper object which wraps the newly built resolver. +// newCCResolverWrapper initializes the ccResolverWrapper. It can only be used +// after calling start, which builds the resolver. func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { ctx, cancel := context.WithCancel(cc.ctx) return &ccResolverWrapper{ @@ -61,6 +61,9 @@ func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { } } +// start builds the name resolver using the resolver.Builder in cc and returns +// any error encountered. It must always be the first operation performed on +// any newly created ccResolverWrapper, except that close may be called instead. func (ccr *ccResolverWrapper) start() error { errCh := make(chan error) ccr.serializer.Schedule(func(ctx context.Context) { From e46448b3153b07ca48efc1120cf405cd720d5093 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 30 Nov 2023 09:37:10 -0800 Subject: [PATCH 10/21] stop optimizing timeout==0 --- internal/idle/idle.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 10d99b1b9581..62413406f6af 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -184,14 +184,6 @@ func (m *Manager) tryEnterIdleMode() bool { } func (m *Manager) EnterIdleModeForTesting() { - if m.timeout == 0 { - m.idleMu.Lock() - defer m.idleMu.Unlock() - if !m.actuallyIdle { - m.enforcer.EnterIdleMode() - } - return - } if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { // We have an active RPC and cannot enter idle mode. return @@ -211,11 +203,7 @@ func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } - if m.timeout == 0 { - // When the manager is disabled (timeout==0), we just exit idle mode if - // needed and return. - return m.ExitIdleMode() - } + if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { // Channel is not idle now. Set the activity bit and allow the call. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) @@ -272,7 +260,7 @@ func (m *Manager) ExitIdleMode() error { // OnCallEnd is invoked at the end of every RPC. func (m *Manager) OnCallEnd() { - if m.timeout == 0 || m.isClosed() { + if m.isClosed() { return } From 0c9b3de3ac62deac71d1bfad522b18be39e8e91f Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 30 Nov 2023 10:31:51 -0800 Subject: [PATCH 11/21] share code better --- internal/idle/idle.go | 54 +++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 33 deletions(-) diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 62413406f6af..3c026d60f256 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -125,20 +125,8 @@ func (m *Manager) handleIdleTimeout() { return } - // This CAS operation is extremely likely to succeed given that there has - // been no activity since the last time we were here. Setting the - // activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the - // channel is either in idle mode or is trying to get there. - if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { - // This CAS operation can fail if an RPC started after we checked for - // activity at the top of this method, or one was ongoing from before - // the last time we were here. In both case, reset the timer and return. - m.resetIdleTimer(m.timeout) - return - } - - // Now that we've set the active calls count to -math.MaxInt32, it's time to - // actually move to idle mode. + // Now that we've checked that there has been no activity, attempt to enter + // idle mode, which is very likely to succeed. if m.tryEnterIdleMode() { // Successfully entered idle mode. No timer needed until we exit idle. return @@ -147,7 +135,6 @@ func (m *Manager) handleIdleTimeout() { // Failed to enter idle mode due to a concurrent RPC that kept the channel // active, or because of an error from the channel. Undo the attempt to // enter idle, and reset the timer to try again later. - atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) m.resetIdleTimer(m.timeout) } @@ -159,43 +146,44 @@ func (m *Manager) handleIdleTimeout() { // // Holds idleMu which ensures mutual exclusion with exitIdleMode. func (m *Manager) tryEnterIdleMode() bool { + // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() + // that the channel is either in idle mode or is trying to get there. + if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { + // This CAS operation can fail if an RPC started after we checked for + // activity in the timer handler, or one was ongoing from before the + // last time the timer fired, or if a test is attempting to enter idle + // mode without checking. In all cases, abort going into idle mode. + return false + } + // N.B. if we fail to enter idle mode after this, we must re-add + // math.MaxInt32 to m.activeCallsCount. + m.idleMu.Lock() defer m.idleMu.Unlock() if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 { // We raced and lost to a new RPC. Very rare, but stop entering idle. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { - // An very short RPC could have come in (and also finished) after we + // A very short RPC could have come in (and also finished) after we // checked for calls count and activity in handleIdleTimeout(), but // before the CAS operation. So, we need to check for activity again. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } - // No new RPCs have come in since we last set the active calls count value - // -math.MaxInt32 in the timer callback. And since we have the lock, it is - // safe to enter idle mode now. + // No new RPCs have come in since we set the active calls count value to + // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode + // unconditionally now. m.enforcer.EnterIdleMode() - - // Successfully entered idle mode. m.actuallyIdle = true return true } func (m *Manager) EnterIdleModeForTesting() { - if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { - // We have an active RPC and cannot enter idle mode. - return - } - if m.tryEnterIdleMode() { - // Successfully entered idle mode. No further action necessary. - return - } - // Failed to enter idle mode due to a concurrent RPC that kept the channel - // active, or because of an error from the channel. Undo the attempt to - // enter idle, and reset the timer to try again later. - atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) + m.tryEnterIdleMode() } // OnCallBegin is invoked at the start of every RPC. From 42abdcd2c93f33012a7ef533c880ff4c3b0201ee Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 30 Nov 2023 13:20:48 -0800 Subject: [PATCH 12/21] atomic --- internal/idle/idle_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index 07a27b2615d8..d5d6ae4696f9 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -319,7 +319,7 @@ type racyEnforcer struct { func (ri *racyEnforcer) ExitIdleMode() error { // Set only on the initial ExitIdleMode if ri.started == false { - if *ri.state != stateInitial { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateExitedIdle)) { return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") } ri.started = true From 1b8d4b0f96b8cb2f8be5b243affa62bce801648e Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 30 Nov 2023 19:36:51 -0800 Subject: [PATCH 13/21] deadlock test --- resolver_balancer_ext_test.go | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index a56a79756193..c15a0733f445 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -21,6 +21,8 @@ package grpc_test import ( "context" "errors" + "fmt" + "runtime" "strings" "testing" "time" @@ -120,3 +122,49 @@ func (s) TestResolverBuildFailure(t *testing.T) { t.Fatalf("Invoke = %v; want %v", err, errStr) } } + +// TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock +// while calling UpdateState at the same time as the resolver being closed while +// the channel enters idle mode. +func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + const name = "testeidrus" + + // Create a manual resolver that spams UpdateState calls until it is closed. + rb := manual.NewBuilderWithScheme(name) + var cancel context.CancelFunc + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + go func() { + for ctx.Err() == nil { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + }() + } + rb.CloseCallback = func() { + cancel() + } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + // Start a timer so we panic out of the deadlock and can see all the + // stack traces to debug the problem. + p := time.AfterFunc(time.Second, func() { + buf := make([]byte, 8192) + buf = buf[0:runtime.Stack(buf, true)] + t.Error("Timed out waiting for enterIdle") + panic(fmt.Sprint("Stack trace:\n", string(buf))) + }) + enterIdle(cc) + p.Stop() + cc.Connect() + } +} From 4696fae47d623d61f075542fc8f7a41cfecc3d3d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 30 Nov 2023 20:12:03 -0800 Subject: [PATCH 14/21] review comments --- balancer_wrapper.go | 6 ++++++ clientconn.go | 8 ++++---- internal/idle/idle.go | 2 +- resolver_wrapper.go | 3 +++ 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 8f56b5398750..3d5c2e55d6f8 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -182,7 +182,13 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { ccb.curBalancerName = builder.Name() } +// close initiates async shutdown of the wrapper. To determine the wrapper has +// finished shutting down, the channel should block on ccb.serializer.Done() +// without cc.mu held. func (ccb *ccBalancerWrapper) close() { + ccb.mu.Lock() + ccb.closed = true + ccb.mu.Unlock() channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") ccb.serializer.Schedule(func(context.Context) { if ccb.balancer == nil { diff --git a/clientconn.go b/clientconn.go index 65af17091535..6895d91bf05b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -341,6 +341,9 @@ func (cc *ClientConn) initIdleStateLocked() { cc.resolverWrapper = newCCResolverWrapper(cc) cc.balancerWrapper = newCCBalancerWrapper(cc) cc.firstResolveEvent = grpcsync.NewEvent() + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. cc.conns = make(map[*addrConn]struct{}) } @@ -355,9 +358,6 @@ func (cc *ClientConn) enterIdleMode() { return } - // cc.conns == nil is a proxy for the ClientConn being closed. So, instead - // of setting it to nil here, we recreate the map. This also means that we - // don't have to do this when exiting idle mode. conns := cc.conns rWrapper := cc.resolverWrapper @@ -658,7 +658,7 @@ func (cc *ClientConn) GetState() connectivity.State { // release. func (cc *ClientConn) Connect() { if err := cc.idlenessMgr.ExitIdleMode(); err != nil { - cc.addTraceEvent(fmt.Sprintf("error exiting idle mode: %v", err)) + cc.addTraceEvent(err.Error()) return } // If the ClientConn was not in idle mode, we need to call ExitIdle on the diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 3c026d60f256..fe49cb74c55a 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -234,7 +234,7 @@ func (m *Manager) ExitIdleMode() error { } if err := m.enforcer.ExitIdleMode(); err != nil { - return fmt.Errorf("channel failed to exit idle mode: %w", err) + return fmt.Errorf("failed to exit idle mode: %w", err) } // Undo the idle entry process. This also respects any new RPC attempts. diff --git a/resolver_wrapper.go b/resolver_wrapper.go index f5c4609d5e85..c79bab12149f 100644 --- a/resolver_wrapper.go +++ b/resolver_wrapper.go @@ -92,6 +92,9 @@ func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { }) } +// close initiates async shutdown of the wrapper. To determine the wrapper has +// finished shutting down, the channel should block on ccr.serializer.Done() +// without cc.mu held. func (ccr *ccResolverWrapper) close() { channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") ccr.mu.Lock() From 944f2f548172c753a97e57733d6bc87845923afc Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 1 Dec 2023 09:16:49 -0800 Subject: [PATCH 15/21] add tests for balancer operations after closure --- internal/channelz/funcs.go | 7 +++ internal/internal.go | 2 + resolver_balancer_ext_test.go | 94 +++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+) diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index 5395e77529cd..fc094f3441b8 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -31,6 +31,7 @@ import ( "time" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" ) const ( @@ -58,6 +59,12 @@ func TurnOn() { } } +func init() { + internal.ChannelzTurnOffForTesting = func() { + atomic.StoreInt32(&curState, 0) + } +} + // IsOn returns whether channelz data collection is on. func IsOn() bool { return atomic.LoadInt32(&curState) == 1 diff --git a/internal/internal.go b/internal/internal.go index 450417ea2af8..434b14f063f9 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -186,6 +186,8 @@ var ( // ExitIdleModeForTesting gets the ClientConn to exit IDLE mode. ExitIdleModeForTesting any // func(*grpc.ClientConn) error + + ChannelzTurnOffForTesting func() ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index c15a0733f445..fcbd09943227 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -29,9 +29,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" ) @@ -168,3 +170,95 @@ func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { cc.Connect() } } + +// TestEnterIdleDuringBalancerUpdateState tests calling UpdateState at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + const name = "testeidbus" + + // Create a balancer that calls UpdateState once asynchronously, attempting + // to make the channel appear ready even after entering idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + if got, want := cc.GetState(), connectivity.Idle; got != want { + t.Fatalf("cc state = %v; want %v", got, want) + } + cc.Connect() + } +} + +// TestEnterIdleDuringBalancerNewSubConn tests calling NewSubConn at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) { + channelz.TurnOn() + defer internal.ChannelzTurnOffForTesting() + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + const name = "testeidbnsc" + + // Create a balancer that calls NewSubConn once asynchronously, attempting + // to create a subchannel after going idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("Found channels: %v; expected 1 entry", tcs) + } + if len(tcs[0].SubChans) != 0 { + t.Fatalf("Found subchannels: %v; expected 0 entries", tcs[0].SubChans) + } + cc.Connect() + } +} From 1d0c2b35dfe0e001a42b7aef8cbe9001a37e79bd Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Sat, 2 Dec 2023 10:39:06 -0800 Subject: [PATCH 16/21] stop calling acbw through cc in teardown; call directly into acbw for the ac being torn down --- balancer_wrapper.go | 2 +- clientconn.go | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 3d5c2e55d6f8..948956904515 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -217,7 +217,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer ccb.mu.Lock() if ccb.closed { ccb.mu.Unlock() - return nil, errConnIdling + return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed.") } ccb.mu.Unlock() diff --git a/clientconn.go b/clientconn.go index 6895d91bf05b..6de13a564fc3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -806,12 +806,6 @@ func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { cc.csMgr.updateState(connectivity.TransientFailure) } -func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { - cc.mu.Lock() - cc.balancerWrapper.updateSubConnState(sc, s, err) - cc.mu.Unlock() -} - // Makes a copy of the input addresses slice and clears out the balancer // attributes field. Addresses are passed during subconn creation and address // update operations. In both cases, we will clear the balancer attributes by @@ -1192,7 +1186,7 @@ type addrConn struct { cc *ClientConn dopts dialOptions - acbw balancer.SubConn + acbw *acBalancerWrapper scopts balancer.NewSubConnOptions // transport is set when there's a viable transport (note: ac state may not be READY as LB channel @@ -1230,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) } else { channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) } - ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) + ac.acbw.ccb.updateSubConnState(ac.acbw, s, lastErr) } // adjustParams updates parameters used to create transports upon From bda70dba0c32c89481699aaddea518284013aaf7 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Sat, 2 Dec 2023 10:44:12 -0800 Subject: [PATCH 17/21] more direct --- balancer_wrapper.go | 28 ++++++++++++++-------------- clientconn.go | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 948956904515..8880c498be9c 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -109,20 +109,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat return <-errCh } -// updateSubConnState is invoked by grpc to push a subConn state update to the -// underlying balancer. -func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { - ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil || ccb.balancer == nil { - return - } - // Even though it is optional for balancers, gracefulswitch ensures - // opts.StateListener is set, so this cannot ever be nil. - // TODO: delete this comment when UpdateSubConnState is removed. - sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) - }) -} - // resolverError is invoked by grpc to push a resolver error to the underlying // balancer. The call to the balancer is executed from the serializer. func (ccb *ccBalancerWrapper) resolverError(err error) { @@ -303,6 +289,20 @@ type acBalancerWrapper struct { producers map[balancer.ProducerBuilder]*refCountedProducer } +// updateState is invoked by grpc to push a subConn state update to the +// underlying balancer. +func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { + acbw.ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + // Even though it is optional for balancers, gracefulswitch ensures + // opts.StateListener is set, so this cannot ever be nil. + // TODO: delete this comment when UpdateSubConnState is removed. + acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + }) +} + func (acbw *acBalancerWrapper) String() string { return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) } diff --git a/clientconn.go b/clientconn.go index 6de13a564fc3..e6f2625b6844 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1224,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) } else { channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) } - ac.acbw.ccb.updateSubConnState(ac.acbw, s, lastErr) + ac.acbw.updateState(s, lastErr) } // adjustParams updates parameters used to create transports upon From 16464a2179d32e3df6839498ca0b9c8d53eeb8da Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Sat, 2 Dec 2023 10:49:22 -0800 Subject: [PATCH 18/21] update err and test --- balancer_wrapper.go | 2 +- balancer_wrapper_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 8880c498be9c..301aedd32b47 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -203,7 +203,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer ccb.mu.Lock() if ccb.closed { ccb.mu.Unlock() - return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed.") + return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed") } ccb.mu.Unlock() diff --git a/balancer_wrapper_test.go b/balancer_wrapper_test.go index 4fd09c145a9f..2892136384fd 100644 --- a/balancer_wrapper_test.go +++ b/balancer_wrapper_test.go @@ -55,7 +55,7 @@ func (s) TestBalancer_StateListenerBeforeConnect(t *testing.T) { t.Error("Unexpected call to StateListener with:", scs) }, }) - if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") { + if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") && !strings.Contains(err.Error(), "balancer is being closed") { t.Error("Unexpected error creating subconn:", err) } wg.Done() From 89188562f66ec41e668a52c7c6419382470d5451 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 5 Dec 2023 09:26:42 -0800 Subject: [PATCH 19/21] name and date --- resolver_balancer_ext_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go index fcbd09943227..aebc6652fa4b 100644 --- a/resolver_balancer_ext_test.go +++ b/resolver_balancer_ext_test.go @@ -1,6 +1,6 @@ /* * - * Copyright 2014 gRPC authors. + * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,8 @@ import ( // 4. balancer.ClientConn.ResolveNow() -> // 5. resolver.Resolver.ResolveNow() -> func (s) TestResolverBalancerInteraction(t *testing.T) { - const name = "testrbi" + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + fmt.Println(name) bf := stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) @@ -104,7 +105,7 @@ func (b *resolverBuilderWithErr) Close() {} // 4. resolver.Builder.Build() fails. func (s) TestResolverBuildFailure(t *testing.T) { enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) - const name = "trbf" + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) resErrCh := make(chan error, 1) resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name}) @@ -130,7 +131,7 @@ func (s) TestResolverBuildFailure(t *testing.T) { // the channel enters idle mode. func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) - const name = "testeidrus" + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) // Create a manual resolver that spams UpdateState calls until it is closed. rb := manual.NewBuilderWithScheme(name) @@ -175,7 +176,7 @@ func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { // time as the balancer being closed while the channel enters idle mode. func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) { enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) - const name = "testeidbus" + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) // Create a balancer that calls UpdateState once asynchronously, attempting // to make the channel appear ready even after entering idle. @@ -220,7 +221,7 @@ func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) { channelz.TurnOn() defer internal.ChannelzTurnOffForTesting() enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) - const name = "testeidbnsc" + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) // Create a balancer that calls NewSubConn once asynchronously, attempting // to create a subchannel after going idle. From e920eaf1f526fbd8d6a65a13ea897c5c4dc96d73 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 5 Dec 2023 09:26:53 -0800 Subject: [PATCH 20/21] comment --- balancer_wrapper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 301aedd32b47..b5e30cff0215 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -168,9 +168,9 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { ccb.curBalancerName = builder.Name() } -// close initiates async shutdown of the wrapper. To determine the wrapper has -// finished shutting down, the channel should block on ccb.serializer.Done() -// without cc.mu held. +// close initiates async shutdown of the wrapper. cc.mu must be held when +// calling this function. To determine the wrapper has finished shutting down, +// the channel should block on ccb.serializer.Done() without cc.mu held. func (ccb *ccBalancerWrapper) close() { ccb.mu.Lock() ccb.closed = true From a4ef6d732e26b69e59a9c6bc64e2e30007bf93bb Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 5 Dec 2023 10:12:47 -0800 Subject: [PATCH 21/21] idle_test fix and tweaks --- internal/idle/idle_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index d5d6ae4696f9..d0fc685d3908 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -319,7 +319,7 @@ type racyEnforcer struct { func (ri *racyEnforcer) ExitIdleMode() error { // Set only on the initial ExitIdleMode if ri.started == false { - if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateExitedIdle)) { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) { return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") } ri.started = true @@ -360,16 +360,16 @@ func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - <-time.After(defaultTestIdleTimeout / 10) + <-time.After(defaultTestIdleTimeout / 50) mgr.handleIdleTimeout() }() - for j := 0; j < 100; j++ { + for j := 0; j < 5; j++ { wg.Add(1) go func() { defer wg.Done() // Wait for the configured idle timeout and simulate an RPC to // race with the idle timeout timer callback. - <-time.After(defaultTestIdleTimeout / 10) + <-time.After(defaultTestIdleTimeout / 50) if err := mgr.OnCallBegin(); err != nil { t.Errorf("OnCallBegin() failed: %v", err) }