Skip to content

Commit

Permalink
clusterimpl: send single picker update synchronously on receipt of co…
Browse files Browse the repository at this point in the history
…nfig update
  • Loading branch information
aranjans committed Oct 15, 2024
1 parent ad81c20 commit 865c319
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 8 deletions.
86 changes: 86 additions & 0 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,92 @@ func (s) TestFailedToParseChildPolicyConfig(t *testing.T) {
}
}

// Test verify that the case picker is updated synchronously on receipt of
// configuration update.
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
// Override the newPickerUpdated to be notified that picker was updated.
pickerUpdated := make(chan struct{}, 1)
origNewPickerUpdated := newPickerUpdated
newPickerUpdated = func() {
t.Logf("Sending picker update.")
pickerUpdated <- struct{}{}
}
defer func() { newPickerUpdated = origNewPickerUpdated }()

// Override the newConfigHook to ensure picker was updated.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() {
// Verify that picker was updated before the completion of
// client conn update.
<-pickerUpdated
clientConnUpdateDone <- struct{}{}
}
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()

builder := balancer.Get(Name)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

// Create a stub balancer which waits for the cluster_impl policy to be
// closed before sending a picker update (upon receipt of a subConn state
// change).
const childPolicyName = "stubBalancer-PickerUpdatedSynchronouslyOnConfigUpdate"
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
return nil
},
})

const (
dropReason = "test-dropping-category"

Check failure on line 991 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropReason is unused (U1000)

Check failure on line 991 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropReason is unused (U1000)
dropNumerator = 1

Check failure on line 992 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropNumerator is unused (U1000)

Check failure on line 992 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropNumerator is unused (U1000)
dropDenominator = 2

Check failure on line 993 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropDenominator is unused (U1000)

Check failure on line 993 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

const dropDenominator is unused (U1000)
)
_, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
//select {
//case <-pickerUpdated:
//case <-clientConnUpdateDone:
// t.Fatal("Client conn update completed before picker update.")
//case <-ctx.Done():
// t.Fatal("Timed out waiting for the picker update on receipt of configuration update.")
//}

select {
case <-clientConnUpdateDone:
case <-ctx.Done():
t.Fatal("Timed out waiting for client conn update to be completed.")
}
}

func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
Expand Down
38 changes: 30 additions & 8 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
var (
connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name)
// Below function is no-op in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
newPickerUpdated = func() {}
)

func init() {
Expand Down Expand Up @@ -102,12 +106,20 @@ type clusterImplBalancer struct {
lrsServer *bootstrap.ServerConfig
loadWrapper *loadstore.Wrapper

// Set during UpdateClientConnState when pushing updates to child policy.
// Prevents state updates from child policy causing new pickers to be sent
// up the channel. Cleared after child policy have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool

clusterNameMu sync.Mutex
clusterName string

serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc

mu sync.Mutex // guards childState and inhibitPickerUpdates

// childState/drops/requestCounter keeps the state used by the most recently
// generated picker.
childState balancer.State
Expand Down Expand Up @@ -199,6 +211,8 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
}

func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error {
defer clientConnUpdateHook()

if b.logger.V(2) {
b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig))
}
Expand Down Expand Up @@ -242,24 +256,32 @@ func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState)
}

b.config = newConfig
// Addresses and sub-balancer config are sent to sub-balancer.
err = b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: parsedCfg,
})

b.mu.Lock()
b.inhibitPickerUpdates = false
childState := b.childState
b.mu.Unlock()
b.telemetryLabels = newConfig.TelemetryLabels
dc := b.handleDropAndRequestCount(newConfig)
if dc != nil && b.childState.Picker != nil {
if dc != nil && childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
ConnectivityState: childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: parsedCfg,
})
newPickerUpdated()
return err
}

func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.mu.Lock()
b.inhibitPickerUpdates = true
b.mu.Unlock()
// Handle the update in a blocking fashion.
errCh := make(chan error, 1)
callback := func(context.Context) {
Expand Down

0 comments on commit 865c319

Please sign in to comment.