diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index b746acfab9..44d6043010 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -1,15 +1,19 @@ package config +import "github.com/envoyproxy/go-control-plane/pkg/cache/types" + // Opts for individual xDS implementations that can be // utilized through the functional opts pattern. type Opts struct { // If true respond to ADS requests with a guaranteed resource ordering - Ordered bool + Ordered bool + DistinctResourceTypes int } func NewOpts() Opts { return Opts{ - Ordered: false, + Ordered: false, + DistinctResourceTypes: int(types.UnknownType), } } diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index b570b19b27..fbfafc1788 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -37,6 +37,14 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} +// WithDistinctResourceTypes overrides the default number of resource types, so that the server can serve Configs with +// more distinct resource types without getting into a deadlock. +func WithDistinctResourceTypes(n int) config.XDSOption { + return func(o *config.Opts) { + o.DistinctResourceTypes = n + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -72,7 +80,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De var streamNonce int64 // a collection of stack allocated watches per request type - watches := newWatches() + watches := newWatches(s.opts.DistinctResourceTypes) node := &core.Node{} diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 63c4c2d38d..1a4c1c54a1 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -1,7 +1,6 @@ package delta import ( - "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -15,15 +14,15 @@ type watches struct { } // newWatches creates and initializes watches. -func newWatches() watches { +func newWatches(distinctResourceTypes int) watches { // deltaMuxedResponses needs a buffer to release go-routines populating it // // because deltaMuxedResponses can be populated by an update from the cache // and a request from the client, we need to create the channel with a buffer // size of 2x the number of types to avoid deadlocks. return watches{ - deltaWatches: make(map[string]watch, int(types.UnknownType)), - deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2), + deltaWatches: make(map[string]watch, distinctResourceTypes), + deltaMuxedResponses: make(chan cache.DeltaResponse, distinctResourceTypes*2), } } diff --git a/pkg/server/delta/v3/watches_test.go b/pkg/server/delta/v3/watches_test.go index cee0985ebd..48aafeca84 100644 --- a/pkg/server/delta/v3/watches_test.go +++ b/pkg/server/delta/v3/watches_test.go @@ -5,11 +5,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) func TestDeltaWatches(t *testing.T) { t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) { - watches := newWatches() + watches := newWatches(int(types.UnknownType)) cancelCount := 0 // create a few watches, and ensure that the cancel function are called and the channels are closed