Skip to content

Commit

Permalink
xds/clusterimpl: update UpdateClientConnState to handle updates synch…
Browse files Browse the repository at this point in the history
…ronously (#7533)
  • Loading branch information
aranjans committed Aug 30, 2024
1 parent 093e099 commit 00514a7
Showing 1 changed file with 87 additions and 119 deletions.
206 changes: 87 additions & 119 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package clusterimpl

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand All @@ -33,7 +34,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
Expand All @@ -53,7 +53,10 @@ const (
defaultRequestCountMax = 1024
)

var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
var (
connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name)
)

func init() {
balancer.Register(bb{})
Expand All @@ -62,18 +65,17 @@ func init() {
type bb struct{}

func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())
b := &clusterImplBalancer{
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
ClientConn: cc,
bOpts: bOpts,
loadWrapper: loadstore.NewWrapper(),
requestCountMax: defaultRequestCountMax,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
go b.run()
b.logger.Infof("Created")
return b
}
Expand All @@ -89,18 +91,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
type clusterImplBalancer struct {
balancer.ClientConn

// mu guarantees mutual exclusion between Close() and handling of picker
// update to the parent ClientConn in run(). It's to make sure that the
// run() goroutine doesn't send picker update to parent after the balancer
// is closed.
//
// It's only used by the run() goroutine, but not the other exported
// functions. Because the exported functions are guaranteed to be
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
xdsClient xdsclient.XDSClient
Expand All @@ -115,10 +105,11 @@ type clusterImplBalancer struct {
clusterNameMu sync.Mutex
clusterName string

serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc

// childState/drops/requestCounter keeps the state used by the most recently
// generated picker. All fields can only be accessed in run(). And run() is
// the only goroutine that sends picker to the parent ClientConn. All
// requests to update picker need to be sent to pickerUpdateCh.
// generated picker.
childState balancer.State
dropCategories []DropConfig // The categories for drops.
drops []*dropper
Expand All @@ -127,7 +118,6 @@ type clusterImplBalancer struct {
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
}

// updateLoadStore checks the config for load store, and decides whether it
Expand Down Expand Up @@ -208,14 +198,9 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
return nil
}

func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
return nil
}

func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error {
if b.logger.V(2) {
b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig))
}
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
Expand All @@ -227,7 +212,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
// it.
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
return fmt.Errorf("child policy %q not registered", newConfig.ChildPolicy.Name)
}

if b.xdsClient == nil {
Expand All @@ -253,9 +238,14 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
}
b.config = newConfig

// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)
b.telemetryLabels = newConfig.TelemetryLabels
dc := b.handleDropAndRequestCount(newConfig)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
Expand All @@ -264,20 +254,28 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
})
}

func (b *clusterImplBalancer) ResolverError(err error) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Handle the update in a blocking fashion.
errCh := make(chan error, 1)
callback := func(context.Context) {
errCh <- b.updateClientConnState(s)
}
b.child.ResolverError(err)
onFailure := func() {
// An attempt to schedule callback fails only when an update is received
// after Close().
errCh <- errBalancerClosed
}
b.serializer.ScheduleOr(callback, onFailure)
return <-errCh
}

func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
return
}
func (b *clusterImplBalancer) ResolverError(err error) {
b.serializer.TrySchedule(func(context.Context) {
b.child.ResolverError(err)
})
}

func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
// Trigger re-resolution when a SubConn turns transient failure. This is
// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
//
Expand All @@ -299,26 +297,40 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer
}

func (b *clusterImplBalancer) Close() {
b.mu.Lock()
b.closed.Fire()
b.mu.Unlock()

b.child.Close()
b.childState = balancer.State{}
b.pickerUpdateCh.Close()
<-b.done.Done()
b.logger.Infof("Shutdown")
b.serializer.TrySchedule(func(ctx context.Context) {
b.child.Close()
b.childState = balancer.State{}

if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
b.logger.Infof("Shutdown")
})
b.serializerCancel()
<-b.serializer.Done()
}

func (b *clusterImplBalancer) ExitIdle() {
b.child.ExitIdle()
b.serializer.TrySchedule(func(context.Context) {
b.child.ExitIdle()
})
}

// Override methods to accept updates from the child LB.

func (b *clusterImplBalancer) UpdateState(state balancer.State) {
// Instead of updating parent ClientConn inline, send state to run().
b.pickerUpdateCh.Put(state)
b.serializer.TrySchedule(func(context.Context) {
b.childState = state
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
})
}

func (b *clusterImplBalancer) setClusterName(n string) {
Expand Down Expand Up @@ -370,21 +382,23 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
scw := &scWrapper{}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
if state.ConnectivityState != connectivity.Ready {
return
}
// Read connected address and call updateLocalityID() based on the connected
// address's locality. https://github.com/grpc/grpc-go/issues/7339
addr := connectedAddress(state)
lID := xdsinternal.GetLocalityID(addr)
if lID.Empty() {
if b.logger.V(2) {
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
b.serializer.TrySchedule(func(context.Context) {
b.updateSubConnState(sc, state, oldListener)
if state.ConnectivityState != connectivity.Ready {
return
}
return
}
scw.updateLocalityID(lID)
// Read connected address and call updateLocalityID() based on the connected
// address's locality. https://github.com/grpc/grpc-go/issues/7339
addr := connectedAddress(state)
lID := xdsinternal.GetLocalityID(addr)
if lID.Empty() {
if b.logger.V(2) {
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
}
return
}
scw.updateLocalityID(lID)
})
}
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
if err != nil {
Expand Down Expand Up @@ -464,49 +478,3 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr
requestCountMax: b.requestCountMax,
}
}

func (b *clusterImplBalancer) run() {
defer b.done.Fire()
for {
select {
case update, ok := <-b.pickerUpdateCh.Get():
if !ok {
return
}
b.pickerUpdateCh.Load()
b.mu.Lock()
if b.closed.HasFired() {
b.mu.Unlock()
return
}
switch u := update.(type) {
case balancer.State:
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}),
})
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}
}
b.mu.Unlock()
case <-b.closed.Done():
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
return
}
}
}

0 comments on commit 00514a7

Please sign in to comment.