Skip to content

Commit

Permalink
address nits, verify caching without version number
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Nov 20, 2024
1 parent 8ed2a3a commit df88c19
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
5 changes: 4 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,10 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
}
// If last update was NACK'd, notify the new watcher of error
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil {
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}

Check warning on line 650 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L649-L650

Added lines #L649 - L650 were not covered by tests
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
}
cleanup = a.unwatchResource(rType, resourceName, watcher)
Expand Down
48 changes: 32 additions & 16 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
}

// TestLDSWatch_NACKError covers the case where an update from the management
// server is NACK'ed by the xdsclient. The test verifies that the error is
// propagated to the existing watcher. After NACK, If a new watcher registers
// server is NACKed by the xdsclient. The test verifies that the error is
// propagated to the existing watcher. After NACK, if a new watcher registers
// for the resource, error is propagated to the new watcher as well.
func (s) TestLDSWatch_NACKError(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
Expand Down Expand Up @@ -937,11 +937,11 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
// Verify that the expected error is propagated to the watcher.
u, err := lw.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}

// Verify that the expected error is propagated to the new watcher as well.
Expand All @@ -950,11 +950,11 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
defer ldsCancel2()
u, err = lw2.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr = u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
}

Expand All @@ -963,18 +963,31 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
// good update as well as latest NACK error. The test verifies that new watcher
// receives both good update and error without a new resource request being
// sent to the management server.
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
firstRequestReceived := false
firstAckReceived := grpcsync.NewEvent()
secondAckReceived := grpcsync.NewEvent()
secondRequestReceived := grpcsync.NewEvent()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
// If the version is "2", it means that a second request has been
// received (after an initial request and ack). The client should
// not send a second request if the resource is already cached.
if req.GetVersionInfo() == "2" {
secondRequestReceived.Fire()
// The first request has an empty version string.
if !firstRequestReceived && req.GetVersionInfo() == "" {
firstRequestReceived = true
return nil
}
// The first ack has a non-empty version string.
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
firstAckReceived.Fire()
return nil
}
// The second ack has a non-empty version string.
if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" {
secondAckReceived.Fire()
return nil
}
// Any requests after the first request and two acks, are not expected.
secondRequestReceived.Fire()
return nil
},
})
Expand All @@ -997,6 +1010,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()

// Configure the management server to return a single listener
// resource, corresponding to the one we registered a watch for.
resources := e2e.UpdateOptions{
Expand All @@ -1009,6 +1023,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify the contents of the received update.
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
Expand All @@ -1030,14 +1045,15 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify that the expected error is propagated to the existing watcher.
u, err := lw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}

// Register another watch for the same resource. This should get the update
Expand All @@ -1050,11 +1066,11 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
}
u, err = lw2.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
t.Fatalf("Timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr = u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
t.Fatalf("Update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
// No request should get sent out as part of this watch.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
Expand All @@ -1069,7 +1085,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {

// TestLDSWatch_PartialValid covers the case where a response from the
// management server contains both valid and invalid resources and is expected
// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding
// to be NACKed by the xdsclient. The test verifies that watchers corresponding
// to the valid resource receive the update, while watchers corresponding to the
// invalid resource receive an error.
func (s) TestLDSWatch_PartialValid(t *testing.T) {
Expand Down

0 comments on commit df88c19

Please sign in to comment.