Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: simplify initialization and cleanup a bit #6798

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
@@ -76,17 +76,14 @@ type ccBalancerWrapper struct {
mode ccbMode // Tracks the current mode of the wrapper.
}

// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
cc: cc,
opts: bopts,
mode: ccbModeIdle,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}

@@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(context.Context) {
defer close(done)

ccb.mu.Lock()
@@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
opts := ccb.opts
if c := opts.DialCreds; c != nil {
opts.DialCreds = c.Clone()
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

@@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

104 changes: 45 additions & 59 deletions clientconn.go
Original file line number Diff line number Diff line change
@@ -160,6 +160,7 @@
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)

// Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
@@ -177,21 +178,9 @@
for _, opt := range opts {
opt.apply(&cc.dopts)
}

chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

defer func() {
if err != nil {
cc.Close()
}
}()

// Register ClientConn with channelz.
cc.channelzRegistration(target)

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)

if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
@@ -211,6 +200,37 @@
cc.dopts.copts.UserAgent = grpcUA
}

// Register ClientConn with channelz.
cc.channelzRegistration(target)

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}

Check warning on line 210 in clientconn.go

Codecov / codecov/patch

clientconn.go#L208-L210

Added lines #L208 - L210 were not covered by tests
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})

defer func() {
if err != nil {
cc.Close()
}
}()

if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -235,14 +255,6 @@
cc.dopts.bs = backoff.DefaultExponential
}

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
return nil, err
}
if err = cc.determineAuthority(); err != nil {
return nil, err
}

if cc.dopts.scChan != nil {
// Blocking wait for the initial service config.
select {
@@ -359,31 +371,13 @@
}()

cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
}
cc.pickerWrapper.exitIdleMode()

var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
if cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
} else {
cc.balancerWrapper.exitIdleMode()
}
cc.balancerWrapper.exitIdleMode()
cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()

@@ -394,9 +388,7 @@
return err
}

if exitedIdle {
cc.addTraceEvent("exiting idle mode")
}
cc.addTraceEvent("exiting idle mode")
return nil
}

@@ -427,7 +419,7 @@
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.pickerWrapper.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
@@ -655,7 +647,7 @@
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
blockingpicker *pickerWrapper
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
@@ -910,7 +902,7 @@
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}

@@ -1174,7 +1166,7 @@
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
@@ -1267,24 +1259,18 @@
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)

pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
// We can safely unlock and continue to access all fields now as
// cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()

// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
cc.pickerWrapper.close()
cc.balancerWrapper.close()
if rWrapper := cc.resolverWrapper; rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil {
idlenessMgr.Close()
}