Skip to content

Commit

Permalink
Fix panic in case the resourceType is not know by the caches (#1036)
Browse files Browse the repository at this point in the history
* Fix panic in case the resourceType is not know by the caches

Signed-off-by: Alan Diego dos Santos <alandiegosantos@gmail.com>
Signed-off-by: Alan Diego <alandiegosantos@gmail.com>
Co-authored-by: Alan Diego <alan.diego@booking.com>
  • Loading branch information
alandiegosantos and Alan Diego authored Nov 2, 2024
1 parent b9e8c8a commit 6c7a557
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,13 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
sw.watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// nil is used to close the streams in the caches
if value.IsNil() || !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}

// If a non cache.Response arrived here, there are serious issues
res := value.Interface().(cache.Response)
nonce, err := sw.send(res)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions pkg/server/v3/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -52,6 +53,11 @@ type mockConfigWatcher struct {

func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() {
config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1

if strings.Contains(req.GetTypeUrl(), nilType) {
out <- nil
}

if len(config.responses[req.GetTypeUrl()]) > 0 {
out <- config.responses[req.GetTypeUrl()][0]
config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:]
Expand Down Expand Up @@ -163,6 +169,7 @@ var (
extensionConfig = resource.MakeExtensionConfig(resource.Ads, extensionConfigName, routeName)
opaque = &core.Address{}
opaqueType = "unknown-type"
nilType = "nil-stream-type" // This type will force the close of the connection
testTypes = []string{
rsrc.EndpointType,
rsrc.ClusterType,
Expand Down Expand Up @@ -659,6 +666,24 @@ func TestOpaqueRequestsChannelMuxing(t *testing.T) {
assert.Equal(t, 0, config.watches)
}

func TestNilPropagationOverResponseChannelShouldCloseTheStream(t *testing.T) {
config := makeMockConfigWatcher()
resp := makeMockStream(t)
for i := 0; i < 10; i++ {
resp.recv <- &discovery.DiscoveryRequest{
Node: node,
TypeUrl: nilType,
// each subsequent request is assumed to supercede the previous request
ResourceNames: []string{fmt.Sprintf("%d", i)},
}
}
close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
err := s.StreamAggregatedResources(resp)
require.Error(t, err)
assert.Equal(t, 0, config.watches)
}

func TestCallbackError(t *testing.T) {
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
Expand Down

0 comments on commit 6c7a557

Please sign in to comment.