From bd4b9802367177b4fc75a1215a33b80f38a87676 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 8 Dec 2023 17:33:42 +0000 Subject: [PATCH 1/3] completely delete WatchListener and WatchRouteConfig APIs --- xds/internal/testutils/fakeclient/client.go | 153 ---------- xds/internal/testutils/resource_watcher.go | 2 +- xds/internal/xdsclient/client.go | 6 - xds/internal/xdsclient/clientimpl_watchers.go | 56 ---- .../xdsclient/tests/authority_test.go | 6 - .../xdsclient/tests/cds_watchers_test.go | 111 +++---- xds/internal/xdsclient/tests/dump_test.go | 10 +- .../xdsclient/tests/eds_watchers_test.go | 6 + .../tests/federation_watchers_test.go | 45 ++- .../xdsclient/tests/lds_watchers_test.go | 279 +++++++++--------- .../xdsclient/tests/misc_watchers_test.go | 87 ++++-- .../xdsclient/tests/rds_watchers_test.go | 217 +++++++------- .../xdsclient/tests/resource_update_test.go | 42 +-- .../xdsclient/xdsresource/type_cds.go | 8 - .../xdsclient/xdsresource/type_lds.go | 8 - .../xdsclient/xdsresource/type_rds.go | 8 - 16 files changed, 411 insertions(+), 633 deletions(-) diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index a898d5f4e859..5cdf638f0489 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) // Client is a fake implementation of an xds client. It exposes a bunch of @@ -38,151 +37,10 @@ type Client struct { xdsclient.XDSClient name string - ldsWatchCh *testutils.Channel - rdsWatchCh *testutils.Channel - cdsWatchCh *testutils.Channel - edsWatchCh *testutils.Channel - ldsCancelCh *testutils.Channel - rdsCancelCh *testutils.Channel - cdsCancelCh *testutils.Channel - edsCancelCh *testutils.Channel loadReportCh *testutils.Channel lrsCancelCh *testutils.Channel loadStore *load.Store bootstrapCfg *bootstrap.Config - - ldsCb func(xdsresource.ListenerUpdate, error) - rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error) - cdsCbs map[string]func(xdsresource.ClusterUpdate, error) - edsCbs map[string]func(xdsresource.EndpointsUpdate, error) -} - -// WatchListener registers a LDS watch. -func (xdsC *Client) WatchListener(serviceName string, callback func(xdsresource.ListenerUpdate, error)) func() { - xdsC.ldsCb = callback - xdsC.ldsWatchCh.Send(serviceName) - return func() { - xdsC.ldsCancelCh.Send(nil) - } -} - -// WaitForWatchListener waits for WatchCluster to be invoked on this client and -// returns the serviceName being watched. -func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) { - val, err := xdsC.ldsWatchCh.Receive(ctx) - if err != nil { - return "", err - } - return val.(string), err -} - -// InvokeWatchListenerCallback invokes the registered ldsWatch callback. -// -// Not thread safe with WatchListener. Only call this after -// WaitForWatchListener. -func (xdsC *Client) InvokeWatchListenerCallback(update xdsresource.ListenerUpdate, err error) { - xdsC.ldsCb(update, err) -} - -// WaitForCancelListenerWatch waits for a LDS watch to be cancelled and returns -// context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error { - _, err := xdsC.ldsCancelCh.Receive(ctx) - return err -} - -// WatchRouteConfig registers a RDS watch. -func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsresource.RouteConfigUpdate, error)) func() { - xdsC.rdsCbs[routeName] = callback - xdsC.rdsWatchCh.Send(routeName) - return func() { - xdsC.rdsCancelCh.Send(routeName) - } -} - -// WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and -// returns the routeName being watched. -func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) { - val, err := xdsC.rdsWatchCh.Receive(ctx) - if err != nil { - return "", err - } - return val.(string), err -} - -// InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback. -// -// Not thread safe with WatchRouteConfig. Only call this after -// WaitForWatchRouteConfig. -func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsresource.RouteConfigUpdate, err error) { - if len(xdsC.rdsCbs) != 1 { - xdsC.rdsCbs[name](update, err) - return - } - // Keeps functionality with previous usage of this on client side, if single - // callback call that callback. - var routeName string - for route := range xdsC.rdsCbs { - routeName = route - } - xdsC.rdsCbs[routeName](update, err) -} - -// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns -// context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) { - val, err := xdsC.rdsCancelCh.Receive(ctx) - if err != nil { - return "", err - } - return val.(string), err -} - -// WatchEndpoints registers an EDS watch for provided clusterName. -func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) { - xdsC.edsCbs[clusterName] = callback - xdsC.edsWatchCh.Send(clusterName) - return func() { - xdsC.edsCancelCh.Send(clusterName) - } -} - -// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and -// returns the clusterName being watched. -func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) { - val, err := xdsC.edsWatchCh.Receive(ctx) - if err != nil { - return "", err - } - return val.(string), err -} - -// InvokeWatchEDSCallback invokes the registered edsWatch callback. -// -// Not thread safe with WatchEndpoints. Only call this after -// WaitForWatchEDS. -func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsresource.EndpointsUpdate, err error) { - if len(xdsC.edsCbs) != 1 { - // This may panic if name isn't found. But it's fine for tests. - xdsC.edsCbs[name](update, err) - return - } - // Keeps functionality with previous usage of this, if single callback call - // that callback. - for n := range xdsC.edsCbs { - name = n - } - xdsC.edsCbs[name](update, err) -} - -// WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns -// context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) { - edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx) - if err != nil { - return "", err - } - return edsNameReceived.(string), err } // ReportLoadArgs wraps the arguments passed to ReportLoad. @@ -247,20 +105,9 @@ func NewClient() *Client { func NewClientWithName(name string) *Client { return &Client{ name: name, - ldsWatchCh: testutils.NewChannelWithSize(10), - rdsWatchCh: testutils.NewChannelWithSize(10), - cdsWatchCh: testutils.NewChannelWithSize(10), - edsWatchCh: testutils.NewChannelWithSize(10), - ldsCancelCh: testutils.NewChannelWithSize(10), - rdsCancelCh: testutils.NewChannelWithSize(10), - cdsCancelCh: testutils.NewChannelWithSize(10), - edsCancelCh: testutils.NewChannelWithSize(10), loadReportCh: testutils.NewChannel(), lrsCancelCh: testutils.NewChannel(), loadStore: load.NewStore(), bootstrapCfg: &bootstrap.Config{ClientDefaultListenerResourceNameTemplate: "%s"}, - rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)), - cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)), - edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)), } } diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index aac9c1464774..2b6b5b1ae448 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -24,7 +24,7 @@ import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" // used to receive updates on watches registered with the xDS client, when using // the resource-type agnostic WatchResource API. // -// Tests can the channels provided by this tyep to get access to updates and +// Tests can use the channels provided by this type to get access to updates and // errors sent by the xDS client. type TestResourceWatcher struct { // UpdateCh is the channel on which xDS client updates are delivered. diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 542c5e025fd1..4665f9385a14 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -30,9 +30,6 @@ import ( // (collectively termed as xDS) on a remote management server, to discover // various dynamic resources. type XDSClient interface { - WatchListener(string, func(xdsresource.ListenerUpdate, error)) func() - WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func() - // WatchResource uses xDS to discover the resource associated with the // provided resource name. The resource type implementation determines how // xDS requests are sent out and how responses are deserialized and @@ -47,9 +44,6 @@ type XDSClient interface { // During a race (e.g. an xDS response is received while the user is calling // cancel()), there's a small window where the callback can be called after // the watcher is canceled. Callers need to handle this case. - // - // TODO: Once this generic client API is fully implemented and integrated, - // delete the resource type specific watch APIs on this interface. WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) // DumpResources returns the status of the xDS resources. Returns a map of diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index 5866221e2696..beb46fbc5d0c 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -25,62 +25,6 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -// This is only required temporarily, while we modify the -// clientImpl.WatchListener API to be implemented via the wrapper -// WatchListener() API which calls the WatchResource() API. -type listenerWatcher struct { - resourceName string - cb func(xdsresource.ListenerUpdate, error) -} - -func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { - l.cb(update.Resource, nil) -} - -func (l *listenerWatcher) OnError(err error) { - l.cb(xdsresource.ListenerUpdate{}, err) -} - -func (l *listenerWatcher) OnResourceDoesNotExist() { - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", l.resourceName) - l.cb(xdsresource.ListenerUpdate{}, err) -} - -// WatchListener uses LDS to discover information about the Listener resource -// identified by resourceName. -func (c *clientImpl) WatchListener(resourceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) { - watcher := &listenerWatcher{resourceName: resourceName, cb: cb} - return xdsresource.WatchListener(c, resourceName, watcher) -} - -// This is only required temporarily, while we modify the -// clientImpl.WatchRouteConfig API to be implemented via the wrapper -// WatchRouteConfig() API which calls the WatchResource() API. -type routeConfigWatcher struct { - resourceName string - cb func(xdsresource.RouteConfigUpdate, error) -} - -func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { - r.cb(update.Resource, nil) -} - -func (r *routeConfigWatcher) OnError(err error) { - r.cb(xdsresource.RouteConfigUpdate{}, err) -} - -func (r *routeConfigWatcher) OnResourceDoesNotExist() { - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", r.resourceName) - r.cb(xdsresource.RouteConfigUpdate{}, err) -} - -// WatchRouteConfig uses RDS to discover information about the -// RouteConfiguration resource identified by resourceName. -func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) { - watcher := &routeConfigWatcher{resourceName: resourceName, cb: cb} - return xdsresource.WatchRouteConfig(c, resourceName, watcher) -} - // WatchResource uses xDS to discover the resource associated with the provided // resource name. The resource type implementation determines how xDS requests // are sent out and how responses are deserialized and validated. Upon receipt diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index 8800531c1cac..91da91f0b021 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -118,12 +118,6 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. return lisDefault, lisNonDefault, client, close } -type noopClusterWatcher struct{} - -func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {} -func (noopClusterWatcher) OnError(err error) {} -func (noopClusterWatcher) OnResourceDoesNotExist() {} - // TestAuthorityShare tests the authority sharing logic. The test verifies the // following scenarios: // - A watch for a resource name with an authority matching an existing watch diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index e4f8ef5cf33c..e3af2bbfae1a 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -41,6 +41,17 @@ import ( v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) +type noopClusterWatcher struct{} + +func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {} +func (noopClusterWatcher) OnError(err error) {} +func (noopClusterWatcher) OnResourceDoesNotExist() {} + +type clusterUpdateErrTuple struct { + update xdsresource.ClusterUpdate + err error +} + type clusterWatcher struct { updateCh *testutils.Channel } @@ -49,20 +60,20 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (ew *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) { - ew.updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update.Resource}) +func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) { + cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) } -func (ew *clusterWatcher) OnError(err error) { +func (cw *clusterWatcher) OnError(err error) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have // access to the most recently received error. - ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: err}) + cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) } -func (ew *clusterWatcher) OnResourceDoesNotExist() { - ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) +func (cw *clusterWatcher) OnResourceDoesNotExist() { + cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) } // badClusterResource returns a cluster resource for the given name which @@ -83,19 +94,19 @@ const wantClusterNACKErr = "unsupported config_source_specifier" // // Returns an error if no update is received before the context deadline expires // or the received update does not match the expected one. -func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ClusterUpdateErrTuple) error { +func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate clusterUpdateErrTuple) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for a cluster resource from the management server: %v", err) } - got := u.(xdsresource.ClusterUpdateErrTuple) - if wantUpdate.Err != nil { - if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType { + got := u.(clusterUpdateErrTuple) + if wantUpdate.err != nil { + if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType { return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) } } cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy")} - if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { + if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" { return fmt.Errorf("received unepected diff in the cluster resource update: (-want, got):\n%s", diff) } return nil @@ -133,7 +144,7 @@ func (s) TestCDSWatch(t *testing.T) { watchedResource *v3clusterpb.Cluster // The resource being watched. updatedWatchedResource *v3clusterpb.Cluster // The watched resource after an update. notWatchedResource *v3clusterpb.Cluster // A resource which is not being watched. - wantUpdate xdsresource.ClusterUpdateErrTuple + wantUpdate clusterUpdateErrTuple }{ { desc: "old style resource", @@ -141,8 +152,8 @@ func (s) TestCDSWatch(t *testing.T) { watchedResource: e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone), updatedWatchedResource: e2e.DefaultCluster(cdsName, "new-eds-resource", e2e.SecurityLevelNone), notWatchedResource: e2e.DefaultCluster("unsubscribed-cds-resource", edsName, e2e.SecurityLevelNone), - wantUpdate: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: edsName, }, @@ -154,8 +165,8 @@ func (s) TestCDSWatch(t *testing.T) { watchedResource: e2e.DefaultCluster(cdsNameNewStyle, edsNameNewStyle, e2e.SecurityLevelNone), updatedWatchedResource: e2e.DefaultCluster(cdsNameNewStyle, "new-eds-resource", e2e.SecurityLevelNone), notWatchedResource: e2e.DefaultCluster("unsubscribed-cds-resource", edsNameNewStyle, e2e.SecurityLevelNone), - wantUpdate: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsNameNewStyle, EDSServiceName: edsNameNewStyle, }, @@ -249,22 +260,22 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName string watchedResource *v3clusterpb.Cluster // The resource being watched. updatedWatchedResource *v3clusterpb.Cluster // The watched resource after an update. - wantUpdateV1 xdsresource.ClusterUpdateErrTuple - wantUpdateV2 xdsresource.ClusterUpdateErrTuple + wantUpdateV1 clusterUpdateErrTuple + wantUpdateV2 clusterUpdateErrTuple }{ { desc: "old style resource", resourceName: cdsName, watchedResource: e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone), updatedWatchedResource: e2e.DefaultCluster(cdsName, "new-eds-resource", e2e.SecurityLevelNone), - wantUpdateV1: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdateV1: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: edsName, }, }, - wantUpdateV2: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdateV2: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: "new-eds-resource", }, @@ -275,14 +286,14 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName: cdsNameNewStyle, watchedResource: e2e.DefaultCluster(cdsNameNewStyle, edsNameNewStyle, e2e.SecurityLevelNone), updatedWatchedResource: e2e.DefaultCluster(cdsNameNewStyle, "new-eds-resource", e2e.SecurityLevelNone), - wantUpdateV1: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdateV1: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsNameNewStyle, EDSServiceName: edsNameNewStyle, }, }, - wantUpdateV2: xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdateV2: clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsNameNewStyle, EDSServiceName: "new-eds-resource", }, @@ -414,14 +425,14 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { } // Verify the contents of the received update for the all watchers. - wantUpdate12 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate12 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: edsName, }, } - wantUpdate3 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate3 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsNameNewStyle, EDSServiceName: edsNameNewStyle, }, @@ -492,8 +503,8 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: edsName, }, @@ -558,7 +569,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") - if err := verifyClusterUpdate(ctx, cw.updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil { + if err := verifyClusterUpdate(ctx, cw.updateCh, clusterUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } } @@ -605,8 +616,8 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: cdsName, EDSServiceName: edsName, }, @@ -673,14 +684,14 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { } // Verify the contents of the received update for both watchers. - wantUpdate1 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate1 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: resourceName1, EDSServiceName: edsName, }, } - wantUpdate2 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate2 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: resourceName2, EDSServiceName: edsNameNewStyle, }, @@ -704,7 +715,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { // The first watcher should receive a resource removed error, while the // second watcher should not receive an update. - if err := verifyClusterUpdate(ctx, cw1.updateCh, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { t.Fatal(err) } if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { @@ -724,8 +735,8 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil { t.Fatal(err) } - wantUpdate := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: resourceName2, EDSServiceName: "new-eds-resource", }, @@ -773,7 +784,7 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { if err != nil { t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err) } - gotErr := u.(xdsresource.ClusterUpdateErrTuple).Err + gotErr := u.(clusterUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantClusterNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantClusterNACKErr) } @@ -828,15 +839,15 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { if err != nil { t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err) } - gotErr := u.(xdsresource.ClusterUpdateErrTuple).Err + gotErr := u.(clusterUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantClusterNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantClusterNACKErr) } // Verify that the watcher watching the good resource receives a good // update. - wantUpdate := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: goodResourceName, EDSServiceName: edsName, }, @@ -889,8 +900,8 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { } // Verify the contents of the received update for first watcher. - wantUpdate1 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate1 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: resourceName1, EDSServiceName: edsName, }, @@ -919,8 +930,8 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { } // Verify the contents of the received update for the second watcher. - wantUpdate2 := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate2 := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: resourceName2, EDSServiceName: edsNameNewStyle, }, diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index a85d4fe30080..577973f63c16 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -60,12 +60,6 @@ func compareDump(ctx context.Context, client xdsclient.XDSClient, want map[strin } } -type noopEndpointsWatcher struct{} - -func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {} -func (noopEndpointsWatcher) OnError(err error) {} -func (noopEndpointsWatcher) OnResourceDoesNotExist() {} - func (s) TestDumpResources(t *testing.T) { // Initialize the xDS resources to be used in this test. ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"} @@ -119,10 +113,10 @@ func (s) TestDumpResources(t *testing.T) { // Register watches, dump resources and expect configs in requested state. for _, target := range ldsTargets { - client.WatchListener(target, func(xdsresource.ListenerUpdate, error) {}) + xdsresource.WatchListener(client, target, noopListenerWatcher{}) } for _, target := range rdsTargets { - client.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {}) + xdsresource.WatchRouteConfig(client, target, noopRouteConfigWatcher{}) } for _, target := range cdsTargets { xdsresource.WatchCluster(client, target, noopClusterWatcher{}) diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index f02130fc7f0c..701ffe6fbe0c 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -52,6 +52,12 @@ const ( edsPort3 = 3 ) +type noopEndpointsWatcher struct{} + +func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {} +func (noopEndpointsWatcher) OnError(err error) {} +func (noopEndpointsWatcher) OnResourceDoesNotExist() {} + type endpointsUpdateErrTuple struct { update xdsresource.EndpointsUpdate err error diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index f008439a0b25..f7e533182441 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -23,7 +23,6 @@ import ( "testing" "github.com/google/uuid" - "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal" @@ -95,15 +94,11 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) { // Register two watches for listener resources with the same query string, // but context parameters in different order. - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1) defer ldsCancel1() - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2) defer ldsCancel2() // Configure the management server for the non-default authority to return a @@ -119,17 +114,17 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: "rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } // Verify the contents of the received update. - if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -151,15 +146,11 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) { // Register two watches for route configuration resources with the same // query string, but context parameters in different order. - updateCh1 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(resourceName1, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw1 := newRouteConfigWatcher() + rdsCancel1 := xdsresource.WatchRouteConfig(client, resourceName1, rw1) defer rdsCancel1() - updateCh2 := testutils.NewChannel() - rdsCancel2 := client.WatchRouteConfig(resourceName2, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw2 := newRouteConfigWatcher() + rdsCancel2 := xdsresource.WatchRouteConfig(client, resourceName2, rw2) defer rdsCancel2() // Configure the management server for the non-default authority to return a @@ -175,8 +166,8 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{"listener-resource"}, @@ -192,10 +183,10 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) { }, } // Verify the contents of the received update. - if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -237,8 +228,8 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - wantUpdate := xdsresource.ClusterUpdateErrTuple{ - Update: xdsresource.ClusterUpdate{ + wantUpdate := clusterUpdateErrTuple{ + update: xdsresource.ClusterUpdate{ ClusterName: "xdstp://non-default-authority/envoy.config.cluster.v3.Cluster/xdsclient-test-cds-resource?a=1&b=2", EDSServiceName: "eds-service-name", }, diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 65d2bf3c1a1f..cb4d4b070d4a 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -72,6 +72,41 @@ const ( edsNameNewStyle = "xdstp:///envoy.config.endpoint.v3.ClusterLoadAssignment/xdsclient-test-eds-resource" ) +type noopListenerWatcher struct{} + +func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {} +func (noopListenerWatcher) OnError(err error) {} +func (noopListenerWatcher) OnResourceDoesNotExist() {} + +type listenerUpdateErrTuple struct { + update xdsresource.ListenerUpdate + err error +} + +type listenerWatcher struct { + updateCh *testutils.Channel +} + +func newListenerWatcher() *listenerWatcher { + return &listenerWatcher{updateCh: testutils.NewChannel()} +} + +func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { + cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) +} + +func (cw *listenerWatcher) OnError(err error) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + cw.updateCh.Replace(listenerUpdateErrTuple{err: err}) +} + +func (cw *listenerWatcher) OnResourceDoesNotExist() { + cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) +} + // badListenerResource returns a listener resource for the given name which does // not contain the `RouteSpecifier` field in the HTTPConnectionManager, and // hence is expected to be NACKed by the client. @@ -115,14 +150,14 @@ func verifyNoListenerUpdate(ctx context.Context, updateCh *testutils.Channel) er // // Returns an error if no update is received before the context deadline expires // or the received update does not match the expected one. -func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdateErrTuple) error { +func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate listenerUpdateErrTuple) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for a listener resource from the management server: %v", err) } - got := u.(xdsresource.ListenerUpdateErrTuple) - if wantUpdate.Err != nil { - if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType { + got := u.(listenerUpdateErrTuple) + if wantUpdate.err != nil { + if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType { return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) } } @@ -131,7 +166,7 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), } - if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { + if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" { return fmt.Errorf("received unepected diff in the listener resource update: (-want, got):\n%s", diff) } return nil @@ -155,7 +190,7 @@ func (s) TestLDSWatch(t *testing.T) { watchedResource *v3listenerpb.Listener // The resource being watched. updatedWatchedResource *v3listenerpb.Listener // The watched resource after an update. notWatchedResource *v3listenerpb.Listener // A resource which is not being watched. - wantUpdate xdsresource.ListenerUpdateErrTuple + wantUpdate listenerUpdateErrTuple }{ { desc: "old style resource", @@ -163,8 +198,8 @@ func (s) TestLDSWatch(t *testing.T) { watchedResource: e2e.DefaultClientListener(ldsName, rdsName), updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"), notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsName), - wantUpdate: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -176,8 +211,8 @@ func (s) TestLDSWatch(t *testing.T) { watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle), updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"), notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsNameNewStyle), - wantUpdate: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsNameNewStyle, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -199,10 +234,8 @@ func (s) TestLDSWatch(t *testing.T) { // Register a watch for a listener resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - ldsCancel := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) { - updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, test.resourceName, lw) // Configure the management server to return a single listener // resource, corresponding to the one we registered a watch for. @@ -218,7 +251,7 @@ func (s) TestLDSWatch(t *testing.T) { } // Verify the contents of the received update. - if err := verifyListenerUpdate(ctx, updateCh, test.wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw.updateCh, test.wantUpdate); err != nil { t.Fatal(err) } @@ -232,7 +265,7 @@ func (s) TestLDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoListenerUpdate(ctx, updateCh); err != nil { + if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil { t.Fatal(err) } @@ -247,7 +280,7 @@ func (s) TestLDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoListenerUpdate(ctx, updateCh); err != nil { + if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil { t.Fatal(err) } }) @@ -273,22 +306,22 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName string watchedResource *v3listenerpb.Listener // The resource being watched. updatedWatchedResource *v3listenerpb.Listener // The watched resource after an update. - wantUpdateV1 xdsresource.ListenerUpdateErrTuple - wantUpdateV2 xdsresource.ListenerUpdateErrTuple + wantUpdateV1 listenerUpdateErrTuple + wantUpdateV2 listenerUpdateErrTuple }{ { desc: "old style resource", resourceName: ldsName, watchedResource: e2e.DefaultClientListener(ldsName, rdsName), updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"), - wantUpdateV1: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdateV1: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, }, - wantUpdateV2: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdateV2: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -299,14 +332,14 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName: ldsNameNewStyle, watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle), updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"), - wantUpdateV1: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdateV1: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsNameNewStyle, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, }, - wantUpdateV2: xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdateV2: listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, @@ -328,15 +361,11 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { // Register two watches for the same listener resource and have the // callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, test.resourceName, lw1) defer ldsCancel1() - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(test.resourceName, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, test.resourceName, lw2) // Configure the management server to return a single listener // resource, corresponding to the one we registered watches for. @@ -352,10 +381,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { } // Verify the contents of the received update. - if err := verifyListenerUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } @@ -366,10 +395,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil { + if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil { t.Fatal(err) } - if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil { + if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil { t.Fatal(err) } @@ -383,10 +412,10 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyListenerUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, test.wantUpdateV2); err != nil { t.Fatal(err) } - if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil { + if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil { t.Fatal(err) } }) @@ -413,22 +442,16 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // Register two watches for the same listener resource and have the // callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) defer ldsCancel1() - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() // Register the third watch for a different listener resource. - updateCh3 := testutils.NewChannel() - ldsCancel3 := client.WatchListener(ldsNameNewStyle, func(u xdsresource.ListenerUpdate, err error) { - updateCh3.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw3 := newListenerWatcher() + ldsCancel3 := xdsresource.WatchListener(client, ldsNameNewStyle, lw3) defer ldsCancel3() // Configure the management server to return two listener resources, @@ -450,19 +473,19 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // Verify the contents of the received update for the all watchers. The two // resources returned differ only in the resource name. Therefore the // expected update is the same for all the watchers. - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, updateCh3, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw3.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -504,10 +527,8 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { // Register a watch for a listener resource and have the watch // callback push the received update on to a channel. - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) defer ldsCancel1() // Configure the management server to return a single listener @@ -524,13 +545,13 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } select { @@ -541,12 +562,10 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { // Register another watch for the same resource. This should get the update // from the cache. - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } // No request should get sent out as part of this watch. @@ -581,10 +600,8 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Register a watch for a resource which is expected to fail with an error // after the watch expiry timer fires. - updateCh := testutils.NewChannel() - ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, ldsName, lw) defer ldsCancel() // Wait for the watch expiry timer to fire. @@ -594,7 +611,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") - if err := verifyListenerUpdate(ctx, updateCh, xdsresource.ListenerUpdateErrTuple{Err: wantErr}); err != nil { + if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } } @@ -623,10 +640,8 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Register a watch for a listener resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, ldsName, lw) defer ldsCancel() // Configure the management server to return a single listener @@ -643,20 +658,20 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { t.Fatal(err) } // Wait for the watch expiry timer to fire, and verify that the callback is // not invoked. <-time.After(defaultTestWatchExpiryTimeout) - if err := verifyNoListenerUpdate(ctx, updateCh); err != nil { + if err := verifyNoListenerUpdate(ctx, lw.updateCh); err != nil { t.Fatal(err) } } @@ -686,17 +701,13 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { // Register two watches for two listener resources and have the // callbacks push the received updates on to a channel. resourceName1 := ldsName - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1) defer ldsCancel1() resourceName2 := ldsNameNewStyle - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2) defer ldsCancel2() // Configure the management server to return two listener resources, @@ -718,16 +729,16 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { // Verify the contents of the received update for both watchers. The two // resources returned differ only in the resource name. Therefore the // expected update is the same for both watchers. - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } @@ -743,12 +754,12 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { // The first watcher should receive a resource removed error, while the // second watcher should not see an update. - if err := verifyListenerUpdate(ctx, updateCh1, xdsresource.ListenerUpdateErrTuple{ - Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""), + if err := verifyListenerUpdate(ctx, lw1.updateCh, listenerUpdateErrTuple{ + err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""), }); err != nil { t.Fatal(err) } - if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil { + if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil { t.Fatal(err) } @@ -762,16 +773,16 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil { + if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil { t.Fatal(err) } - wantUpdate = xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate = listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: "new-rds-resource", HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -792,12 +803,8 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { // Register a watch for a listener resource and have the watch // callback push the received update on to a channel. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - updateCh := testutils.NewChannel() - ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) { - updateCh.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, ldsName, lw) defer ldsCancel() // Configure the management server to return a single listener resource @@ -807,16 +814,18 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)}, SkipValidation: true, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } // Verify that the expected error is propagated to the watcher. - u, err := updateCh.Receive(ctx) + u, err := lw.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) } - gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err + gotErr := u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } @@ -844,16 +853,12 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() badResourceName := ldsName - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(badResourceName, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, badResourceName, lw1) defer ldsCancel1() goodResourceName := ldsNameNewStyle - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(goodResourceName, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.SendContext(ctx, xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, goodResourceName, lw2) defer ldsCancel2() // Configure the management server with two listener resources. One of these @@ -872,24 +877,24 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { // Verify that the expected error is propagated to the watcher which // requested for the bad resource. - u, err := updateCh1.Receive(ctx) + u, err := lw1.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err) } - gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err + gotErr := u.(listenerUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr) } // Verify that the watcher watching the good resource receives a good // update. - wantUpdate := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -915,17 +920,13 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { // Register two watches for two listener resources and have the // callbacks push the received updates on to a channel. resourceName1 := ldsName - updateCh1 := testutils.NewChannel() - ldsCancel1 := client.WatchListener(resourceName1, func(u xdsresource.ListenerUpdate, err error) { - updateCh1.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, resourceName1, lw1) defer ldsCancel1() resourceName2 := ldsNameNewStyle - updateCh2 := testutils.NewChannel() - ldsCancel2 := client.WatchListener(resourceName2, func(u xdsresource.ListenerUpdate, err error) { - updateCh2.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err}) - }) + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, resourceName2, lw2) defer ldsCancel2() // Configure the management server to return only one of the two listener @@ -944,18 +945,18 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { } // Verify the contents of the received update for first watcher. - wantUpdate1 := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate1 := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh1, wantUpdate1); err != nil { + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate1); err != nil { t.Fatal(err) } // Verify that the second watcher does not get an update with an error. - if err := verifyNoListenerUpdate(ctx, updateCh2); err != nil { + if err := verifyNoListenerUpdate(ctx, lw2.updateCh); err != nil { t.Fatal(err) } @@ -974,19 +975,19 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { } // Verify the contents of the received update for the second watcher. - wantUpdate2 := xdsresource.ListenerUpdateErrTuple{ - Update: xdsresource.ListenerUpdate{ + wantUpdate2 := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ RouteConfigName: rdsName, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, } - if err := verifyListenerUpdate(ctx, updateCh2, wantUpdate2); err != nil { + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate2); err != nil { t.Fatal(err) } // Verify that the first watcher gets no update, as the first resource did // not change. - if err := verifyNoListenerUpdate(ctx, updateCh1); err != nil { + if err := verifyNoListenerUpdate(ctx, lw1.updateCh); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 4846f9f1e61e..55a006848217 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -47,6 +47,53 @@ var ( routeConfigResourceType = internal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type) ) +// This route configuration watcher +// callback of the first resource, register two more watches (one for the +// same resource name, which would be satisfied from the cache, and another +// for a different resource name, which would be satisfied from the server). +type routeConfigWatcherWithWatchFromCallback struct { + client xdsclient.XDSClient + name1, name2 string + rcw1, rcw2 *routeConfigWatcher + cancel1, cancel2 func() + updateCh *testutils.Channel +} + +func newRouteConfigWatcherWithWatchFromCallback(client xdsclient.XDSClient, name1, name2 string) *routeConfigWatcherWithWatchFromCallback { + return &routeConfigWatcherWithWatchFromCallback{ + client: client, + name1: name1, + name2: name2, + rcw1: newRouteConfigWatcher(), + rcw2: newRouteConfigWatcher(), + updateCh: testutils.NewChannel(), + } +} + +func (rw *routeConfigWatcherWithWatchFromCallback) OnUpdate(update *xdsresource.RouteConfigResourceData) { + rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + + rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) + rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) +} + +func (rw *routeConfigWatcherWithWatchFromCallback) OnError(err error) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +} + +func (rw *routeConfigWatcherWithWatchFromCallback) OnResourceDoesNotExist() { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) +} + +func (rw *routeConfigWatcherWithWatchFromCallback) cancel() { + rw.cancel1() + rw.cancel2() +} + // TestWatchCallAnotherWatch tests the scenario where a watch is registered for // a resource, and more watches are registered from the first watch's callback. // The test verifies that this scenario does not lead to a deadlock. @@ -78,32 +125,14 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - // Start a watch for one route configuration resource. From the watch - // callback of the first resource, register two more watches (one for the - // same resource name, which would be satisfied from the cache, and another - // for a different resource name, which would be satisfied from the server). - updateCh1 := testutils.NewChannel() - updateCh2 := testutils.NewChannel() - updateCh3 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - - // Watch for the same resource name. - rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) - t.Cleanup(rdsCancel2) - // Watch for a different resource name. - rdsCancel3 := client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) - t.Cleanup(rdsCancel3) - }) - t.Cleanup(rdsCancel1) + rw := newRouteConfigWatcherWithWatchFromCallback(client, rdsName, rdsNameNewStyle) + defer rw.cancel() + rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw) + defer rdsCancel() // Verify the contents of the received update for the all watchers. - wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate12 := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -118,8 +147,8 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { }, }, } - wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate3 := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsNameNewStyle}, @@ -134,13 +163,13 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { }, }, } - if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.updateCh, wantUpdate12); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.rcw1.updateCh, wantUpdate12); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.rcw2.updateCh, wantUpdate3); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index d8a239056a02..0a196130d7c0 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -42,6 +42,41 @@ import ( v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) +type noopRouteConfigWatcher struct{} + +func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {} +func (noopRouteConfigWatcher) OnError(err error) {} +func (noopRouteConfigWatcher) OnResourceDoesNotExist() {} + +type routeConfigUpdateErrTuple struct { + update xdsresource.RouteConfigUpdate + err error +} + +type routeConfigWatcher struct { + updateCh *testutils.Channel +} + +func newRouteConfigWatcher() *routeConfigWatcher { + return &routeConfigWatcher{updateCh: testutils.NewChannel()} +} + +func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { + rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) +} + +func (rw *routeConfigWatcher) OnError(err error) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +} + +func (rw *routeConfigWatcher) OnResourceDoesNotExist() { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) +} + // badRouteConfigResource returns a RouteConfiguration resource for the given // routeName which contains a retry config with num_retries set to `0`. This is // expected to be NACK'ed by the xDS client. @@ -72,19 +107,19 @@ const wantRouteConfigNACKErr = "received route is invalid: retry_policy.num_retr // // Returns an error if no update is received before the context deadline expires // or the received update does not match the expected one. -func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.RouteConfigUpdateErrTuple) error { +func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate routeConfigUpdateErrTuple) error { u, err := updateCh.Receive(ctx) if err != nil { return fmt.Errorf("timeout when waiting for a route configuration resource from the management server: %v", err) } - got := u.(xdsresource.RouteConfigUpdateErrTuple) - if wantUpdate.Err != nil { - if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType { + got := u.(routeConfigUpdateErrTuple) + if wantUpdate.err != nil { + if gotType, wantType := xdsresource.ErrType(got.err), xdsresource.ErrType(wantUpdate.err); gotType != wantType { return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) } } cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw")} - if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { + if diff := cmp.Diff(wantUpdate.update, got.update, cmpOpts...); diff != "" { return fmt.Errorf("received unepected diff in the route configuration resource update: (-want, got):\n%s", diff) } return nil @@ -123,7 +158,7 @@ func (s) TestRDSWatch(t *testing.T) { watchedResource *v3routepb.RouteConfiguration // The resource being watched. updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update. notWatchedResource *v3routepb.RouteConfiguration // A resource which is not being watched. - wantUpdate xdsresource.RouteConfigUpdateErrTuple + wantUpdate routeConfigUpdateErrTuple }{ { desc: "old style resource", @@ -131,8 +166,8 @@ func (s) TestRDSWatch(t *testing.T) { watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"), notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsName, cdsName), - wantUpdate: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -154,8 +189,8 @@ func (s) TestRDSWatch(t *testing.T) { watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle), updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"), notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsNameNewStyle, cdsNameNewStyle), - wantUpdate: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsNameNewStyle}, @@ -187,10 +222,8 @@ func (s) TestRDSWatch(t *testing.T) { // Register a watch for a route configuration resource and have the // watch callback push the received update on to a channel. - updateCh := testutils.NewChannel() - rdsCancel := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw := newRouteConfigWatcher() + rdsCancel := xdsresource.WatchRouteConfig(client, test.resourceName, rw) // Configure the management server to return a single route // configuration resource, corresponding to the one being watched. @@ -206,7 +239,7 @@ func (s) TestRDSWatch(t *testing.T) { } // Verify the contents of the received update. - if err := verifyRouteConfigUpdate(ctx, updateCh, test.wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.updateCh, test.wantUpdate); err != nil { t.Fatal(err) } @@ -220,7 +253,7 @@ func (s) TestRDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil { t.Fatal(err) } @@ -235,7 +268,7 @@ func (s) TestRDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil { t.Fatal(err) } }) @@ -261,16 +294,16 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName string watchedResource *v3routepb.RouteConfiguration // The resource being watched. updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update. - wantUpdateV1 xdsresource.RouteConfigUpdateErrTuple - wantUpdateV2 xdsresource.RouteConfigUpdateErrTuple + wantUpdateV1 routeConfigUpdateErrTuple + wantUpdateV2 routeConfigUpdateErrTuple }{ { desc: "old style resource", resourceName: rdsName, watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"), - wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdateV1: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -285,8 +318,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { }, }, }, - wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdateV2: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -307,8 +340,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { resourceName: rdsNameNewStyle, watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle), updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"), - wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdateV1: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsNameNewStyle}, @@ -323,8 +356,8 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { }, }, }, - wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdateV2: routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsNameNewStyle}, @@ -356,15 +389,11 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { // Register two watches for the same route configuration resource // and have the callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw1 := newRouteConfigWatcher() + rdsCancel1 := xdsresource.WatchRouteConfig(client, test.resourceName, rw1) defer rdsCancel1() - updateCh2 := testutils.NewChannel() - rdsCancel2 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw2 := newRouteConfigWatcher() + rdsCancel2 := xdsresource.WatchRouteConfig(client, test.resourceName, rw2) // Configure the management server to return a single route // configuration resource, corresponding to the one being watched. @@ -380,10 +409,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { } // Verify the contents of the received update. - if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } @@ -394,10 +423,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoRouteConfigUpdate(ctx, updateCh1); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw1.updateCh); err != nil { t.Fatal(err) } - if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw2.updateCh); err != nil { t.Fatal(err) } @@ -411,10 +440,10 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, test.wantUpdateV2); err != nil { t.Fatal(err) } - if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw2.updateCh); err != nil { t.Fatal(err) } }) @@ -441,22 +470,16 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // Register two watches for the same route configuration resource // and have the callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw1 := newRouteConfigWatcher() + rdsCancel1 := xdsresource.WatchRouteConfig(client, rdsName, rw1) defer rdsCancel1() - updateCh2 := testutils.NewChannel() - rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw2 := newRouteConfigWatcher() + rdsCancel2 := xdsresource.WatchRouteConfig(client, rdsName, rw2) defer rdsCancel2() // Register the third watch for a different route configuration resource. - updateCh3 := testutils.NewChannel() - rdsCancel3 := client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw3 := newRouteConfigWatcher() + rdsCancel3 := xdsresource.WatchRouteConfig(client, rdsNameNewStyle, rw3) defer rdsCancel3() // Configure the management server to return two route configuration @@ -478,8 +501,8 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // Verify the contents of the received update for the all watchers. The two // resources returned differ only in the resource name. Therefore the // expected update is the same for all the watchers. - wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -494,13 +517,13 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { }, }, } - if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw3.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -542,10 +565,8 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { // Register a watch for a route configuration resource and have the watch // callback push the received update on to a channel. - updateCh1 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw1 := newRouteConfigWatcher() + rdsCancel1 := xdsresource.WatchRouteConfig(client, rdsName, rw1) defer rdsCancel1() // Configure the management server to return a single route configuration @@ -562,8 +583,8 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -578,7 +599,7 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { }, }, } - if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } select { @@ -589,12 +610,10 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { // Register another watch for the same resource. This should get the update // from the cache. - updateCh2 := testutils.NewChannel() - rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw2 := newRouteConfigWatcher() + rdsCancel2 := xdsresource.WatchRouteConfig(client, rdsName, rw2) defer rdsCancel2() - if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } // No request should get sent out as part of this watch. @@ -630,10 +649,8 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Register a watch for a resource which is expected to fail with an error // after the watch expiry timer fires. - updateCh := testutils.NewChannel() - rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw := newRouteConfigWatcher() + rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw) defer rdsCancel() // Wait for the watch expiry timer to fire. @@ -643,7 +660,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") - if err := verifyRouteConfigUpdate(ctx, updateCh, xdsresource.RouteConfigUpdateErrTuple{Err: wantErr}); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.updateCh, routeConfigUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } } @@ -672,10 +689,8 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Register a watch for a route configuration resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw := newRouteConfigWatcher() + rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw) defer rdsCancel() // Configure the management server to return a single route configuration @@ -692,8 +707,8 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { } // Verify the contents of the received update. - wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -708,14 +723,14 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { }, }, } - if err := verifyRouteConfigUpdate(ctx, updateCh, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw.updateCh, wantUpdate); err != nil { t.Fatal(err) } // Wait for the watch expiry timer to fire, and verify that the callback is // not invoked. <-time.After(defaultTestWatchExpiryTimeout) - if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + if err := verifyNoRouteConfigUpdate(ctx, rw.updateCh); err != nil { t.Fatal(err) } } @@ -736,12 +751,8 @@ func (s) TestRDSWatch_NACKError(t *testing.T) { // Register a watch for a route configuration resource and have the watch // callback push the received update on to a channel. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - updateCh := testutils.NewChannel() - rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw := newRouteConfigWatcher() + rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw) defer rdsCancel() // Configure the management server to return a single route configuration @@ -751,16 +762,18 @@ func (s) TestRDSWatch_NACKError(t *testing.T) { Routes: []*v3routepb.RouteConfiguration{badRouteConfigResource(rdsName, ldsName, cdsName)}, SkipValidation: true, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } // Verify that the expected error is propagated to the watcher. - u, err := updateCh.Receive(ctx) + u, err := rw.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err) } - gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err + gotErr := u.(routeConfigUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr) } @@ -788,16 +801,12 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() badResourceName := rdsName - updateCh1 := testutils.NewChannel() - rdsCancel1 := client.WatchRouteConfig(badResourceName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh1.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw1 := newRouteConfigWatcher() + rdsCancel1 := xdsresource.WatchRouteConfig(client, badResourceName, rw1) defer rdsCancel1() goodResourceName := rdsNameNewStyle - updateCh2 := testutils.NewChannel() - rdsCancel2 := client.WatchRouteConfig(goodResourceName, func(u xdsresource.RouteConfigUpdate, err error) { - updateCh2.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) - }) + rw2 := newRouteConfigWatcher() + rdsCancel2 := xdsresource.WatchRouteConfig(client, goodResourceName, rw2) defer rdsCancel2() // Configure the management server to return two route configuration @@ -816,19 +825,19 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { // Verify that the expected error is propagated to the watcher which // requested for the bad resource. - u, err := updateCh1.Receive(ctx) + u, err := rw1.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err) } - gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err + gotErr := u.(routeConfigUpdateErrTuple).err if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) { t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr) } // Verify that the watcher watching the good resource receives a good // update. - wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ - Update: xdsresource.RouteConfigUpdate{ + wantUpdate := routeConfigUpdateErrTuple{ + update: xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{ldsName}, @@ -843,7 +852,7 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { }, }, } - if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyRouteConfigUpdate(ctx, rw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 7fa066065c33..5f47a6692568 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -251,18 +251,9 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) - // A wrapper struct to wrap the update and the associated error, as - // received by the resource watch callback. - type updateAndErr struct { - update xdsresource.ListenerUpdate - err error - } - updateAndErrCh := testutils.NewChannel() - // Register a watch, and push the results on to a channel. - client.WatchListener(test.resourceName, func(update xdsresource.ListenerUpdate, err error) { - updateAndErrCh.Send(updateAndErr{update: update, err: err}) - }) + lw := newListenerWatcher() + xdsresource.WatchListener(client, test.resourceName, lw) t.Logf("Registered a watch for Listener %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -288,12 +279,12 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // Wait for an update from the xDS client and compare with expected // update. - val, err = updateAndErrCh.Receive(ctx) + val, err = lw.updateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err) } - gotUpdate := val.(updateAndErr).update - gotErr := val.(updateAndErr).err + gotUpdate := val.(listenerUpdateErrTuple).update + gotErr := val.(listenerUpdateErrTuple).err if (gotErr != nil) != (test.wantErr != "") { t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr) } @@ -513,18 +504,9 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) - // A wrapper struct to wrap the update and the associated error, as - // received by the resource watch callback. - type updateAndErr struct { - update xdsresource.RouteConfigUpdate - err error - } - updateAndErrCh := testutils.NewChannel() - // Register a watch, and push the results on to a channel. - client.WatchRouteConfig(test.resourceName, func(update xdsresource.RouteConfigUpdate, err error) { - updateAndErrCh.Send(updateAndErr{update: update, err: err}) - }) + rw := newRouteConfigWatcher() + xdsresource.WatchRouteConfig(client, test.resourceName, rw) t.Logf("Registered a watch for Route Configuration %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -550,12 +532,12 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { // Wait for an update from the xDS client and compare with expected // update. - val, err = updateAndErrCh.Receive(ctx) + val, err = rw.updateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err) } - gotUpdate := val.(updateAndErr).update - gotErr := val.(updateAndErr).err + gotUpdate := val.(routeConfigUpdateErrTuple).update + gotErr := val.(routeConfigUpdateErrTuple).err if (gotErr != nil) != (test.wantErr != "") { t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr) } @@ -770,8 +752,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { if err != nil { t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err) } - gotUpdate := val.(xdsresource.ClusterUpdateErrTuple).Update - gotErr := val.(xdsresource.ClusterUpdateErrTuple).Err + gotUpdate := val.(clusterUpdateErrTuple).update + gotErr := val.(clusterUpdateErrTuple).err if (gotErr != nil) != (test.wantErr != "") { t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr) } diff --git a/xds/internal/xdsclient/xdsresource/type_cds.go b/xds/internal/xdsclient/xdsresource/type_cds.go index 269d9ebdae15..b59eb9c33883 100644 --- a/xds/internal/xdsclient/xdsresource/type_cds.go +++ b/xds/internal/xdsclient/xdsresource/type_cds.go @@ -86,11 +86,3 @@ type ClusterUpdate struct { // Raw is the resource from the xds response. Raw *anypb.Any } - -// ClusterUpdateErrTuple is a tuple with the update and error. It contains the -// results from unmarshal functions. It's used to pass unmarshal results of -// multiple resources together, e.g. in maps like `map[string]{Update,error}`. -type ClusterUpdateErrTuple struct { - Update ClusterUpdate - Err error -} diff --git a/xds/internal/xdsclient/xdsresource/type_lds.go b/xds/internal/xdsclient/xdsresource/type_lds.go index a2742fb4371a..a71e38ea9a52 100644 --- a/xds/internal/xdsclient/xdsresource/type_lds.go +++ b/xds/internal/xdsclient/xdsresource/type_lds.go @@ -77,11 +77,3 @@ type InboundListenerConfig struct { // FilterChains is the list of filter chains associated with this listener. FilterChains *FilterChainManager } - -// ListenerUpdateErrTuple is a tuple with the update and error. It contains the -// results from unmarshal functions. It's used to pass unmarshal results of -// multiple resources together, e.g. in maps like `map[string]{Update,error}`. -type ListenerUpdateErrTuple struct { - Update ListenerUpdate - Err error -} diff --git a/xds/internal/xdsclient/xdsresource/type_rds.go b/xds/internal/xdsclient/xdsresource/type_rds.go index ad59209163e7..45d84d67063f 100644 --- a/xds/internal/xdsclient/xdsresource/type_rds.go +++ b/xds/internal/xdsclient/xdsresource/type_rds.go @@ -246,11 +246,3 @@ func (sc *SecurityConfig) Equal(other *SecurityConfig) bool { } return true } - -// RouteConfigUpdateErrTuple is a tuple with the update and error. It contains -// the results from unmarshal functions. It's used to pass unmarshal results of -// multiple resources together, e.g. in maps like `map[string]{Update,error}`. -type RouteConfigUpdateErrTuple struct { - Update RouteConfigUpdate - Err error -} From 41f6ba07540a90e51ed1d013e628c276bfa7766b Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 13 Dec 2023 19:39:40 +0000 Subject: [PATCH 2/3] review comments pass 1 --- .../xdsclient/tests/misc_watchers_test.go | 28 +++++++++++-------- .../xdsclient/tests/resource_update_test.go | 13 +++++---- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 55a006848217..403e20c7112c 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -47,11 +47,9 @@ var ( routeConfigResourceType = internal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type) ) -// This route configuration watcher -// callback of the first resource, register two more watches (one for the -// same resource name, which would be satisfied from the cache, and another -// for a different resource name, which would be satisfied from the server). -type routeConfigWatcherWithWatchFromCallback struct { +// This route configuration watcher registers two +// more watches from the OnUpdate callback of the original resource for which it was created. +type testRouteConfigWatcher struct { client xdsclient.XDSClient name1, name2 string rcw1, rcw2 *routeConfigWatcher @@ -59,8 +57,8 @@ type routeConfigWatcherWithWatchFromCallback struct { updateCh *testutils.Channel } -func newRouteConfigWatcherWithWatchFromCallback(client xdsclient.XDSClient, name1, name2 string) *routeConfigWatcherWithWatchFromCallback { - return &routeConfigWatcherWithWatchFromCallback{ +func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) *testRouteConfigWatcher { + return &testRouteConfigWatcher{ client: client, name1: name1, name2: name2, @@ -70,14 +68,14 @@ func newRouteConfigWatcherWithWatchFromCallback(client xdsclient.XDSClient, name } } -func (rw *routeConfigWatcherWithWatchFromCallback) OnUpdate(update *xdsresource.RouteConfigResourceData) { +func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) } -func (rw *routeConfigWatcherWithWatchFromCallback) OnError(err error) { +func (rw *testRouteConfigWatcher) OnError(err error) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -85,11 +83,11 @@ func (rw *routeConfigWatcherWithWatchFromCallback) OnError(err error) { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) } -func (rw *routeConfigWatcherWithWatchFromCallback) OnResourceDoesNotExist() { +func (rw *testRouteConfigWatcher) OnResourceDoesNotExist() { rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) } -func (rw *routeConfigWatcherWithWatchFromCallback) cancel() { +func (rw *testRouteConfigWatcher) cancel() { rw.cancel1() rw.cancel2() } @@ -125,7 +123,13 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - rw := newRouteConfigWatcherWithWatchFromCallback(client, rdsName, rdsNameNewStyle) + // Create a route configuration watcher that registers two more watches from + // the OnUpdate callback: + // - one for the same resource name as this watch, which would be + // satisfied from xdsClient cache + // - the other for a different resource name, which would be + // satisfied from the server + rw := newTestRouteConfigWatcher(client, rdsName, rdsNameNewStyle) defer rw.cancel() rdsCancel := xdsresource.WatchRouteConfig(client, rdsName, rw) defer rdsCancel() diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 5f47a6692568..33282f964a6f 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -253,7 +253,8 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // Register a watch, and push the results on to a channel. lw := newListenerWatcher() - xdsresource.WatchListener(client, test.resourceName, lw) + cancel := xdsresource.WatchListener(client, test.resourceName, lw) + defer cancel() t.Logf("Registered a watch for Listener %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -506,7 +507,8 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { // Register a watch, and push the results on to a channel. rw := newRouteConfigWatcher() - xdsresource.WatchRouteConfig(client, test.resourceName, rw) + cancel := xdsresource.WatchRouteConfig(client, test.resourceName, rw) + defer cancel() t.Logf("Registered a watch for Route Configuration %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -722,7 +724,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { // Register a watch, and push the results on to a channel. cw := newClusterWatcher() - xdsresource.WatchCluster(client, test.resourceName, cw) + cancel := xdsresource.WatchCluster(client, test.resourceName, cw) + defer cancel() t.Logf("Registered a watch for Cluster %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -1034,8 +1037,8 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { // Register a watch, and push the results on to a channel. ew := newEndpointsWatcher() - edsCancel := xdsresource.WatchEndpoints(client, test.resourceName, ew) - defer edsCancel() + cancel := xdsresource.WatchEndpoints(client, test.resourceName, ew) + defer cancel() t.Logf("Registered a watch for Endpoint %q", test.resourceName) // Wait for the discovery request to be sent out. From 61e0c718a8e3fe76f9c75f6f3b43002d4f6249d3 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 15 Dec 2023 00:07:04 +0000 Subject: [PATCH 3/3] last review comment --- xds/internal/xdsclient/tests/misc_watchers_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 403e20c7112c..d6f7bc9e6484 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -47,8 +47,8 @@ var ( routeConfigResourceType = internal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type) ) -// This route configuration watcher registers two -// more watches from the OnUpdate callback of the original resource for which it was created. +// This route configuration watcher registers two watches corresponding to the +// names passed in at creation time on a valid update. type testRouteConfigWatcher struct { client xdsclient.XDSClient name1, name2 string