diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index c2e1253c8f4d..6e9eaa5cbab9 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -943,6 +943,72 @@ func (s) TestFailedToParseChildPolicyConfig(t *testing.T) { } } +// Test verify that the case picker is updated synchronously on receipt of +// configuration update. +func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) { + // Override the newPickerUpdated to be notified that picker was updated. + pickerUpdated := make(chan struct{}, 1) + origNewPickerUpdated := newPickerUpdated + newPickerUpdated = func() { + t.Logf("Sending picker update.") + pickerUpdated <- struct{}{} + } + defer func() { newPickerUpdated = origNewPickerUpdated }() + + // Override the newConfigHook to ensure picker was updated. + clientConnUpdateDone := make(chan struct{}, 1) + origClientConnUpdateHook := clientConnUpdateHook + clientConnUpdateHook = func() { + // Verify that picker was updated before the completion of + // client conn update. + <-pickerUpdated + clientConnUpdateDone <- struct{}{} + } + defer func() { clientConnUpdateHook = origClientConnUpdateHook }() + + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) + xdsC := fakeclient.NewClient() + + builder := balancer.Get(Name) + cc := testutils.NewBalancerClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + // Create a stub balancer which waits for the cluster_impl policy to be + // closed before sending a picker update (upon receipt of a subConn state + // change). + const childPolicyName = "stubBalancer-PickerUpdatedSynchronouslyOnConfigUpdate" + stub.Register(childPolicyName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + bd.ClientConn.UpdateState(balancer.State{ + Picker: base.NewErrPicker(errors.New("dummy error picker")), + }) + return nil + }, + }) + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + BalancerConfig: &LBConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: childPolicyName, + }, + }, + }); err != nil { + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-clientConnUpdateDone: + case <-ctx.Done(): + t.Fatal("Timed out waiting for client conn update to be completed.") + } +} + func assertString(f func() (string, error)) string { s, err := f() if err != nil { diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index bbd754c3f4c0..e752f64417ea 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -56,6 +56,10 @@ const ( var ( connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address) errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name) + // Below function is no-op in actual code, but can be overridden in + // tests to give tests visibility into exactly when certain events happen. + clientConnUpdateHook = func() {} + newPickerUpdated = func() {} ) func init() { @@ -102,12 +106,20 @@ type clusterImplBalancer struct { lrsServer *bootstrap.ServerConfig loadWrapper *loadstore.Wrapper + // Set during UpdateClientConnState when pushing updates to child policy. + // Prevents state updates from child policy causing new pickers to be sent + // up the channel. Cleared after child policy have processed the + // updates sent to them, after which a new picker is sent up the channel. + inhibitPickerUpdates bool + clusterNameMu sync.Mutex clusterName string serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc + mu sync.Mutex // guards childState and inhibitPickerUpdates + // childState/drops/requestCounter keeps the state used by the most recently // generated picker. childState balancer.State @@ -199,6 +211,8 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { } func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error { + defer clientConnUpdateHook() + if b.logger.V(2) { b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig)) } @@ -242,24 +256,32 @@ func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) } b.config = newConfig + // Addresses and sub-balancer config are sent to sub-balancer. + err = b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: parsedCfg, + }) + b.mu.Lock() + b.inhibitPickerUpdates = false + childState := b.childState + b.mu.Unlock() b.telemetryLabels = newConfig.TelemetryLabels dc := b.handleDropAndRequestCount(newConfig) - if dc != nil && b.childState.Picker != nil { + if dc != nil && childState.Picker != nil { b.ClientConn.UpdateState(balancer.State{ - ConnectivityState: b.childState.ConnectivityState, + ConnectivityState: childState.ConnectivityState, Picker: b.newPicker(dc), }) } - - // Addresses and sub-balancer config are sent to sub-balancer. - return b.child.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: s.ResolverState, - BalancerConfig: parsedCfg, - }) + newPickerUpdated() + return err } func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + b.mu.Lock() + b.inhibitPickerUpdates = true + b.mu.Unlock() // Handle the update in a blocking fashion. errCh := make(chan error, 1) callback := func(context.Context) {