Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: simplify initialization and cleanup a bit #6798

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,14 @@ type ccBalancerWrapper struct {
mode ccbMode // Tracks the current mode of the wrapper.
}

// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
cc: cc,
opts: bopts,
mode: ccbModeIdle,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}

Expand Down Expand Up @@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(context.Context) {
defer close(done)

ccb.mu.Lock()
Expand All @@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
opts := ccb.opts
if c := opts.DialCreds; c != nil {
opts.DialCreds = c.Clone()
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

Expand Down Expand Up @@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

Expand Down
104 changes: 45 additions & 59 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)

// Apply dial options
dfawley marked this conversation as resolved.
Show resolved Hide resolved
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
Expand All @@ -177,21 +178,9 @@
for _, opt := range opts {
opt.apply(&cc.dopts)
}

chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

defer func() {
if err != nil {
cc.Close()
}
}()

// Register ClientConn with channelz.
cc.channelzRegistration(target)

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)

if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
Expand All @@ -211,6 +200,37 @@
cc.dopts.copts.UserAgent = grpcUA
}

// Register ClientConn with channelz.
cc.channelzRegistration(target)

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}

Check warning on line 210 in clientconn.go

View check run for this annotation

Codecov / codecov/patch

clientconn.go#L208-L210

Added lines #L208 - L210 were not covered by tests
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})

defer func() {
if err != nil {
cc.Close()
}
}()

if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
Expand All @@ -235,14 +255,6 @@
cc.dopts.bs = backoff.DefaultExponential
}

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
return nil, err
}
if err = cc.determineAuthority(); err != nil {
return nil, err
}

if cc.dopts.scChan != nil {
// Blocking wait for the initial service config.
select {
Expand Down Expand Up @@ -359,31 +371,13 @@
}()

cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
}
cc.pickerWrapper.exitIdleMode()

var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
if cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
} else {
cc.balancerWrapper.exitIdleMode()
}
cc.balancerWrapper.exitIdleMode()
cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()

Expand All @@ -394,9 +388,7 @@
return err
}

if exitedIdle {
cc.addTraceEvent("exiting idle mode")
}
cc.addTraceEvent("exiting idle mode")
return nil
}

Expand Down Expand Up @@ -427,7 +419,7 @@
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.pickerWrapper.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
Expand Down Expand Up @@ -655,7 +647,7 @@
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
blockingpicker *pickerWrapper
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
Expand Down Expand Up @@ -910,7 +902,7 @@
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}

Expand Down Expand Up @@ -1174,7 +1166,7 @@
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
Expand Down Expand Up @@ -1267,24 +1259,18 @@
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)

pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
// We can safely unlock and continue to access all fields now as
// cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()

// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
cc.pickerWrapper.close()
cc.balancerWrapper.close()
if rWrapper := cc.resolverWrapper; rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil {
idlenessMgr.Close()
}

Expand Down
Loading