From 44d23bace671548f193fd2c3cbc19294276172e8 Mon Sep 17 00:00:00 2001 From: huabing zhao Date: Tue, 15 Aug 2023 18:37:26 +0800 Subject: [PATCH] Add WithOrededADS option to delta xds server Signed-off-by: huabing zhao --- pkg/server/delta/v3/server.go | 32 ++++++++++++++++++++++++++++++-- pkg/server/delta/v3/watches.go | 5 ----- pkg/server/v3/delta_test.go | 5 ++++- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 9152ab2d96..7267d5109a 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -11,6 +11,7 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" @@ -37,6 +38,13 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} +// WithOrderedADS enables the internal flag to order responses strictly. +func WithOrderedADS() config.XDSOption { + return func(o *config.Opts) { + o.Ordered = true + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -65,7 +73,10 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba return s } -func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { +func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { + // create a sharedChan for the watches to send responses to + sharedChan := make(chan cache.DeltaResponse, types.UnknownType) + streamID := atomic.AddInt64(&s.streamCount, 1) // streamNonce holds a unique nonce for req-resp pairs per xDS stream. @@ -78,6 +89,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De defer func() { watches.Cancel() + close(sharedChan) if s.callbacks != nil { s.callbacks.OnDeltaStreamClosed(streamID, node) } @@ -156,11 +168,15 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De req.Node = node } + ordered := false // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } + if s.opts.Ordered { + ordered = true + } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -184,8 +200,20 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - watch.responses = make(chan cache.DeltaResponse, 1) + if ordered { + // Use the shared channel for ordered responses + watch.responses = sharedChan + } else { + watch.responses = make(chan cache.DeltaResponse, 1) + } watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) + if !ordered { + watch.cancel = func() { + watch.cancel() + close(watch.responses) + } + } + watches.deltaWatches[typeURL] = watch go func() { diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index c88548388a..d95bc7dc54 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -44,9 +44,4 @@ func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil { - // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here - // This is needed to release resources taken by goroutines watching this channel - close(w.responses) - } } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index f8429f2997..876f275f87 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,6 +15,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -345,7 +346,9 @@ func TestDeltaAggregatedHandlers(t *testing.T) { resp.recv <- r } - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + // We create the server with the optional ordered ADS flag so we guarantee resource + // ordering over the stream. + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS()) go func() { err := s.DeltaAggregatedResources(resp) assert.NoError(t, err)