From 159cde6d820c3312a600fd6923fda313c77ad3b6 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 1 Oct 2024 23:54:53 +0000 Subject: [PATCH 1/4] xdsclient: invoke watch callback when new update matches cached one, but previous update was NACKed --- xds/internal/xdsclient/authority.go | 9 +- .../tests/ads_stream_ack_nack_test.go | 482 ++++++++++++++++ .../transport/transport_ack_nack_test.go | 521 ------------------ .../xdsclient/xdsresource/unmarshal_lds.go | 4 +- 4 files changed, 490 insertions(+), 526 deletions(-) create mode 100644 xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go delete mode 100644 xds/internal/xdsclient/transport/transport_ack_nack_test.go diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 3251737f181e..7c6766a77f00 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -247,9 +247,12 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty state.deletionIgnored = false a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName()) } - // Notify watchers only if this is a first time update or it is different - // from the one currently cached. - if state.cache == nil || !state.cache.Equal(uErr.resource) { + // Notify watchers if any of these conditions are met: + // - this is the first update for this resource + // - this update is different from the one currently cached + // - the previous update for this resource was NACKed, but the update + // before that was the same as this update. + if state.cache == nil || !state.cache.Equal(uErr.resource) || state.md.ErrState != nil { for watcher := range state.watchers { watcher := watcher resource := uErr.resource diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go new file mode 100644 index 000000000000..527e4edb3941 --- /dev/null +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -0,0 +1,482 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// Tests simple ACK and NACK scenarios on the ADS stream: +// 1. When a good response is received, i.e. once that is expected to be ACKed, +// the test verifies that an ACK is sent matching the version and nonce from +// the response. +// 2. When a subsequent bad response is received, i.e. once is expected to be +// NACKed, the test verifies that a NACK is sent matching the previously +// ACKed version and current nonce from the response. +// 3. When a subsequent goos response is received, the test verifies that an +// ACK is sent matching the version and nonce from the current response. +func (s) TestADS_ACK_NACK_Simple(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ack version and nonce. + streamRequestCh := testutils.NewChannel() + streamResponseCh := testutils.NewChannel() + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.Send(req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + // The go-control-plane management server continuously resends the + // same resource if NACKed by the client. Hence, we need to use + // `Replace` here instead of `Send`. + streamResponseCh.Replace(resp) + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client with bootstrap pointing to the above server. + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClient(t, bc) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Update the management server with a listener resource that contains an + // empty HTTP connection manager within the apiListener, which will cause + // the resource to be NACKed. + badListener := proto.Clone(listenerResource).(*v3listenerpb.Listener) + badListener.ApiListener.ApiListener = nil + mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{badListener}, + SkipValidation: true, + }) + + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp = r.(*v3discoverypb.DiscoveryResponse) + + var wantNackErr = errors.New("unexpected http connection manager resource type") + if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil { + t.Fatal(err) + } + + // Verify that the NACK contains the appropriate version, nonce and error. + // We expect the version to not change as this is a NACK. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + if gotNonce, wantNonce := gotReq.GetResponseNonce(), gotResp.GetNonce(); gotNonce != wantNonce { + t.Errorf("Unexpected nonce in discovery request, got: %v, want: %v", gotNonce, wantNonce) + } + if gotErr := gotReq.GetErrorDetail(); gotErr == nil || !strings.Contains(gotErr.GetMessage(), wantNackErr.Error()) { + t.Fatalf("Unexpected error in discovery request, got: %v, want: %v", gotErr.GetMessage(), wantNackErr) + } + + // Update the management server to send a good resource again. + mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + }) + + // The envoy-go-control-plane management server keeps resending the same + // resource as long as we keep NACK'ing it. So, we will see the bad resource + // sent to us a few times here, before receiving the good resource. + for { + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp = r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + wantReq.ErrorDetail = nil + diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()) + if diff == "" { + break + } + t.Logf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) { + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Logf("Failed to verify listener update, err: %v", err) + continue + } + break + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for listener update") + } +} + +// Tests the case where the first response is invalid. The test verifies that +// the NACK contains an empty version string. +func (s) TestADS_ACK_NACK_InvalidFirstResponse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ack version and nonce. + streamRequestCh := testutils.NewChannel() + streamResponseCh := testutils.NewChannel() + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.Send(req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + // The go-control-plane management server continuously resends the + // same resource if NACKed by the client. Hence, we need to use + // `Replace` here instead of `Send`. + streamResponseCh.Replace(resp) + }, + }) + + // Create a listener resource on the management server that is expected to + // be NACKed by the xDS client. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + listenerResource.ApiListener.ApiListener = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client with bootstrap pointing to the above server. + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClient(t, bc) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the error is propagated to the watcher. + var wantNackErr = errors.New("unexpected http connection manager resource type") + if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil { + t.Fatal(err) + } + + // NACK should contain the appropriate error, nonce, but empty version. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + if gotVersion, wantVersion := gotReq.GetVersionInfo(), ""; gotVersion != wantVersion { + t.Errorf("Unexpected version in discovery request, got: %v, want: %v", gotVersion, wantVersion) + } + if gotNonce, wantNonce := gotReq.GetResponseNonce(), gotResp.GetNonce(); gotNonce != wantNonce { + t.Errorf("Unexpected nonce in discovery request, got: %v, want: %v", gotNonce, wantNonce) + } + if gotErr := gotReq.GetErrorDetail(); gotErr == nil || !strings.Contains(gotErr.GetMessage(), wantNackErr.Error()) { + t.Fatalf("Unexpected error in discovery request, got: %v, want: %v", gotErr.GetMessage(), wantNackErr) + } +} + +// Tests the scenario where the xDS client is no longer interested in a +// resource. The following sequence of events are tested: +// 1. A resource is requested and a good response is received. The test verifies +// that an ACK is sent for this resource. +// 2. The previously requested resource is no longer requested. The test +// verifies that a request with no resource names is sent out. +// 3. The same resource is requested again. The test verifies that the request +// is sent with the previously ACKed version. +func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ack version and nonce. + streamRequestCh := testutils.NewChannel() + streamResponseCh := testutils.NewChannel() + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.Send(req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + // The go-control-plane management server continuously resends the + // same resource if NACKed by the client. Hence, we need to use + // `Replace` here instead of `Send`. + streamResponseCh.Replace(resp) + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client with bootstrap pointing to the above server. + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Cancel the watch on the listener resource. This should result in a + // discovery request with no resource names. + ldsCancel() + + // Verify that the discovery request matches expectation. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for discovery request") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.ResourceNames = nil + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Register a watch for the same listener resource. + lw = newListenerWatcher() + ldsCancel = xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that the discovery request contains the version from the + // previously received response. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for discovery request") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.ResourceNames = []string{listenerName} + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // TODO(https://github.com/envoyproxy/go-control-plane/issues/1002): Once + // this bug is fixed, we need to verify that the update is received by the + // watcher. +} diff --git a/xds/internal/xdsclient/transport/transport_ack_nack_test.go b/xds/internal/xdsclient/transport/transport_ack_nack_test.go deleted file mode 100644 index 73b2635eff6a..000000000000 --- a/xds/internal/xdsclient/transport/transport_ack_nack_test.go +++ /dev/null @@ -1,521 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package transport_test - -import ( - "context" - "errors" - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/transport" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/wrapperspb" - - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - statuspb "google.golang.org/genproto/googleapis/rpc/status" -) - -var ( - errWantNack = errors.New("unsupported field 'use_original_dst' is present and set to true") - - // A simple update handler for listener resources which validates only the - // `use_original_dst` field. - dataModelValidator = func(update transport.ResourceUpdate, onDone func()) error { - defer onDone() - for _, r := range update.Resources { - inner := &v3discoverypb.Resource{} - if err := proto.Unmarshal(r.GetValue(), inner); err != nil { - return fmt.Errorf("failed to unmarshal DiscoveryResponse: %v", err) - } - lis := &v3listenerpb.Listener{} - if err := proto.Unmarshal(r.GetValue(), lis); err != nil { - return fmt.Errorf("failed to unmarshal DiscoveryResponse: %v", err) - } - if useOrigDst := lis.GetUseOriginalDst(); useOrigDst != nil && useOrigDst.GetValue() { - return errWantNack - } - } - return nil - } -) - -// TestSimpleAckAndNack tests simple ACK and NACK scenarios. -// 1. When the data model layer likes a received response, the test verifies -// that an ACK is sent matching the version and nonce from the response. -// 2. When a subsequent response is disliked by the data model layer, the test -// verifies that a NACK is sent matching the previously ACKed version and -// current nonce from the response. -// 3. When a subsequent response is liked by the data model layer, the test -// verifies that an ACK is sent matching the version and nonce from the -// current response. -func (s) TestSimpleAckAndNack(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Create an xDS management server listening on a local port. Configure the - // request and response handlers to push on channels which are inspected by - // the test goroutine to verify ack version and nonce. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) - streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - case <-ctx.Done(): - } - return nil - }, - OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - select { - case streamResponseCh <- resp: - case <-ctx.Done(): - } - }, - }) - - // Configure the management server with appropriate resources. - apiListener := &v3listenerpb.ApiListener{ - ApiListener: func() *anypb.Any { - return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: "route-configuration-name", - }, - }, - }) - }(), - } - const resourceName = "resource name 1" - listenerResource := &v3listenerpb.Listener{ - Name: resourceName, - ApiListener: apiListener, - } - nodeID := uuid.New().String() - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource}, - SkipValidation: true, - }) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: dataModelValidator, - OnErrorHandler: func(err error) {}, - OnSendHandler: func(*transport.ResourceSendInfo) {}, - NodeProto: &v3corepb.Node{Id: nodeID}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Capture the version and nonce from the response. - var gotResp *v3discoverypb.DiscoveryResponse - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - - // Verify that the ACK contains the appropriate version and nonce. - wantReq.VersionInfo = gotResp.GetVersionInfo() - wantReq.ResponseNonce = gotResp.GetNonce() - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Update the management server's copy of the resource to include a field - // which will cause the resource to be NACKed. - badListener := proto.Clone(listenerResource).(*v3listenerpb.Listener) - badListener.UseOriginalDst = &wrapperspb.BoolValue{Value: true} - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{badListener}, - SkipValidation: true, - }) - - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - - // Verify that the NACK contains the appropriate version, nonce and error. - // We expect the version to not change as this is a NACK. - wantReq.ResponseNonce = gotResp.GetNonce() - wantReq.ErrorDetail = &statuspb.Status{ - Code: int32(codes.InvalidArgument), - Message: errWantNack.Error(), - } - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Update the management server to send a good resource again. - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource}, - SkipValidation: true, - }) - - // The envoy-go-control-plane management server keeps resending the same - // resource as long as we keep NACK'ing it. So, we will see the bad resource - // sent to us a few times here, before receiving the good resource. - for { - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - - // Verify that the ACK contains the appropriate version and nonce. - wantReq.VersionInfo = gotResp.GetVersionInfo() - wantReq.ResponseNonce = gotResp.GetNonce() - wantReq.ErrorDetail = nil - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)) - if diff == "" { - break - } - t.Logf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} - -// TestInvalidFirstResponse tests the case where the first response is invalid. -// The test verifies that the NACK contains an empty version string. -func (s) TestInvalidFirstResponse(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Create an xDS management server listening on a local port. Configure the - // request and response handlers to push on channels which are inspected by - // the test goroutine to verify ack version and nonce. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) - streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - case <-ctx.Done(): - } - return nil - }, - OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - select { - case streamResponseCh <- resp: - case <-ctx.Done(): - } - }, - }) - - // Configure the management server with appropriate resources. - apiListener := &v3listenerpb.ApiListener{ - ApiListener: func() *anypb.Any { - return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: "route-configuration-name", - }, - }, - }) - }(), - } - const resourceName = "resource name 1" - listenerResource := &v3listenerpb.Listener{ - Name: resourceName, - ApiListener: apiListener, - UseOriginalDst: &wrapperspb.BoolValue{Value: true}, // This will cause the resource to be NACKed. - } - nodeID := uuid.New().String() - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource}, - SkipValidation: true, - }) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - NodeProto: &v3corepb.Node{Id: nodeID}, - OnRecvHandler: dataModelValidator, - OnErrorHandler: func(err error) {}, - OnSendHandler: func(*transport.ResourceSendInfo) {}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - var gotResp *v3discoverypb.DiscoveryResponse - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - - // NACK should contain the appropriate error, nonce, but empty version. - wantReq.VersionInfo = "" - wantReq.ResponseNonce = gotResp.GetNonce() - wantReq.ErrorDetail = &statuspb.Status{ - Code: int32(codes.InvalidArgument), - Message: errWantNack.Error(), - } - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} - -// TestResourceIsNotRequestedAnymore tests the scenario where the xDS client is -// no longer interested in a resource. The following sequence of events are -// tested: -// 1. A resource is requested and a good response is received. The test verifies -// that an ACK is sent for this resource. -// 2. The previously requested resource is no longer requested. The test -// verifies that a request with no resource names is sent out. -// 3. The same resource is requested again. The test verifies that the request -// is sent with the previously ACKed version. -func (s) TestResourceIsNotRequestedAnymore(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Create an xDS management server listening on a local port. Configure the - // request and response handlers to push on channels which are inspected by - // the test goroutine to verify ack version and nonce. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) - streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - case <-ctx.Done(): - } - return nil - }, - OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - select { - case streamResponseCh <- resp: - case <-ctx.Done(): - } - }, - }) - - // Configure the management server with appropriate resources. - apiListener := &v3listenerpb.ApiListener{ - ApiListener: func() *anypb.Any { - return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: "route-configuration-name", - }, - }, - }) - }(), - } - const resourceName = "resource name 1" - listenerResource := &v3listenerpb.Listener{ - Name: resourceName, - ApiListener: apiListener, - } - nodeID := uuid.New().String() - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource}, - SkipValidation: true, - }) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - NodeProto: &v3corepb.Node{Id: nodeID}, - OnRecvHandler: dataModelValidator, - OnErrorHandler: func(err error) {}, - OnSendHandler: func(*transport.ResourceSendInfo) {}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Capture the version and nonce from the response. - var gotResp *v3discoverypb.DiscoveryResponse - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - - // Verify that the ACK contains the appropriate version and nonce. - wantReq.VersionInfo = gotResp.GetVersionInfo() - wantReq.ResponseNonce = gotResp.GetNonce() - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Send a discovery request with no resource names. - tr.SendRequest(version.V3ListenerURL, []string{}) - - // Verify that the discovery request matches expectation. - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq.ResourceNames = nil - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Send a discovery request for the same resource requested earlier. - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - // Verify that the discovery request contains the version from the - // previously received response. - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq.ResourceNames = []string{resourceName} - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_lds.go b/xds/internal/xdsclient/xdsresource/unmarshal_lds.go index e9a29b938756..1b0d4599f106 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_lds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_lds.go @@ -39,7 +39,7 @@ func unmarshalListenerResource(r *anypb.Any) (string, ListenerUpdate, error) { } if !IsListenerResource(r.GetTypeUrl()) { - return "", ListenerUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) + return "", ListenerUpdate{}, fmt.Errorf("unexpected listener resource type: %q ", r.GetTypeUrl()) } lis := &v3listenerpb.Listener{} if err := proto.Unmarshal(r.GetValue(), lis); err != nil { @@ -68,7 +68,7 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err apiLisAny := lis.GetApiListener().GetApiListener() if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) { - return nil, fmt.Errorf("unexpected resource type: %q", apiLisAny.GetTypeUrl()) + return nil, fmt.Errorf("unexpected http connection manager resource type: %q", apiLisAny.GetTypeUrl()) } apiLis := &v3httppb.HttpConnectionManager{} if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil { From 7c9f57816e67c4ff523bdc09f87ce11621c0bf1b Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 2 Oct 2024 12:57:04 +0000 Subject: [PATCH 2/4] fix a flake in the test --- .../xdsclient/tests/ads_stream_ack_nack_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index 527e4edb3941..f0221110eef3 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -246,10 +246,7 @@ func (s) TestADS_ACK_NACK_InvalidFirstResponse(t *testing.T) { return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - // The go-control-plane management server continuously resends the - // same resource if NACKed by the client. Hence, we need to use - // `Replace` here instead of `Send`. - streamResponseCh.Replace(resp) + streamResponseCh.Send(resp) }, }) @@ -354,10 +351,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - // The go-control-plane management server continuously resends the - // same resource if NACKed by the client. Hence, we need to use - // `Replace` here instead of `Send`. - streamResponseCh.Replace(resp) + streamResponseCh.Send(resp) }, }) From a41f75bc5f2c8d5027fa3653feaadc68b77af292 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 2 Oct 2024 13:06:00 +0000 Subject: [PATCH 3/4] use SendContext to ensure no goroutine leaks --- .../xdsclient/tests/ads_stream_ack_nack_test.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index f0221110eef3..5c4b9d74f2d3 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -60,14 +60,11 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh.Send(req) + streamRequestCh.SendContext(ctx, req) return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - // The go-control-plane management server continuously resends the - // same resource if NACKed by the client. Hence, we need to use - // `Replace` here instead of `Send`. - streamResponseCh.Replace(resp) + streamResponseCh.SendContext(ctx, resp) }, }) @@ -242,11 +239,11 @@ func (s) TestADS_ACK_NACK_InvalidFirstResponse(t *testing.T) { streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh.Send(req) + streamRequestCh.SendContext(ctx, req) return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - streamResponseCh.Send(resp) + streamResponseCh.SendContext(ctx, resp) }, }) @@ -347,11 +344,11 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh.Send(req) + streamRequestCh.SendContext(ctx, req) return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - streamResponseCh.Send(resp) + streamResponseCh.SendContext(ctx, resp) }, }) From 333e5d8f016810cd9acf7fdea2a11ca1230d419c Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 4 Oct 2024 21:28:56 +0000 Subject: [PATCH 4/4] first round of review comments from zasweq --- .../xdsclient/tests/ads_stream_ack_nack_test.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index 5c4b9d74f2d3..f7ef094ddf2f 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -47,7 +47,7 @@ import ( // 2. When a subsequent bad response is received, i.e. once is expected to be // NACKed, the test verifies that a NACK is sent matching the previously // ACKed version and current nonce from the response. -// 3. When a subsequent goos response is received, the test verifies that an +// 3. When a subsequent good response is received, the test verifies that an // ACK is sent matching the version and nonce from the current response. func (s) TestADS_ACK_NACK_Simple(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -55,7 +55,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels that are inspected by - // the test goroutine to verify ack version and nonce. + // the test goroutine to verify ACK version and nonce. streamRequestCh := testutils.NewChannel() streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ @@ -161,7 +161,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { } gotResp = r.(*v3discoverypb.DiscoveryResponse) - var wantNackErr = errors.New("unexpected http connection manager resource type") + wantNackErr := errors.New("unexpected http connection manager resource type") if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil { t.Fatal(err) } @@ -170,7 +170,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { // We expect the version to not change as this is a NACK. r, err = streamRequestCh.Receive(ctx) if err != nil { - t.Fatal("Timeout when waiting for ACK") + t.Fatal("Timeout when waiting for NACK") } gotReq = r.(*v3discoverypb.DiscoveryRequest) if gotNonce, wantNonce := gotReq.GetResponseNonce(), gotResp.GetNonce(); gotNonce != wantNonce { @@ -214,15 +214,16 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { } // Verify the update received by the watcher. + var lastErr error for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) { if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { - t.Logf("Failed to verify listener update, err: %v", err) + lastErr = err continue } break } if ctx.Err() != nil { - t.Fatal("Timeout when waiting for listener update") + t.Fatalf("Timeout when waiting for listener update. Last seen error: %v", lastErr) } } @@ -234,7 +235,7 @@ func (s) TestADS_ACK_NACK_InvalidFirstResponse(t *testing.T) { // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels that are inspected by - // the test goroutine to verify ack version and nonce. + // the test goroutine to verify ACK version and nonce. streamRequestCh := testutils.NewChannel() streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ @@ -339,7 +340,7 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels that are inspected by - // the test goroutine to verify ack version and nonce. + // the test goroutine to verify ACK version and nonce. streamRequestCh := testutils.NewChannel() streamResponseCh := testutils.NewChannel() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{