diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 395ff3865c..9f608586ec 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -3,6 +3,7 @@ package delta import ( "context" "errors" + "log" "strconv" "sync/atomic" @@ -37,13 +38,6 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} -// WithOrderedADS enables the internal flag to order responses strictly. -func WithOrderedADS() config.XDSOption { - return func(o *config.Opts) { - o.Ordered = true - } -} - type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -116,30 +110,44 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De } } - for { - select { - case <-s.ctx.Done(): - return nil - case resp, more := <-watches.deltaMuxedResponses: - if !more { - break - } + done := make(chan struct{}, 1) + go func() { + for { + select { + case <-done: + return + case resp, more := <-watches.deltaMuxedResponses: + if !more { + break + } - typ := resp.GetDeltaRequest().GetTypeUrl() - if resp == deltaErrorResponse { - return status.Errorf(codes.Unavailable, typ+" watch failed") - } + typ := resp.GetDeltaRequest().GetTypeUrl() + if resp == deltaErrorResponse { + log.Printf("%s watch failed\n", typ) + break + } - nonce, err := send(resp) - if err != nil { - return err - } + nonce, err := send(resp) + if err != nil { + log.Printf("failed to send response %+v\n", err) + break + } - watch := watches.deltaWatches[typ] - watch.nonce = nonce + watches.mu.Lock() + watch := watches.deltaWatches[typ] + watch.nonce = nonce + watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watches.deltaWatches[typ] = watch + watches.mu.Unlock() + } + } + }() - watch.state.SetResourceVersions(resp.GetNextVersionMap()) - watches.deltaWatches[typ] = watch + for { + select { + case <-s.ctx.Done(): + done <- struct{}{} + return nil case req, more := <-reqCh: // input stream ended or errored out if !more { @@ -163,15 +171,11 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De req.Node = node } - ordered := false // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if s.opts.Ordered { - ordered = true - } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -179,6 +183,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De typeURL := req.GetTypeUrl() // cancel existing watch to (re-)request a newer version + watches.mu.Lock() watch, ok := watches.deltaWatches[typeURL] if !ok { // Initialize the state of the stream. @@ -195,25 +200,10 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - if ordered { - // Use the shared channel to keep the order of responses. - watch.UseSharedResponseChan(watches.deltaMuxedResponses) - } else { - watch.MakeResponseChan() - } + watch.responses = watches.deltaMuxedResponses watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) watches.deltaWatches[typeURL] = watch - - // just handle normal non-ordered responses here - // all ordered responses are sent to the muxedResponses channel directly - if !watch.useSharedChan { - go func() { - resp, more := <-watch.responses - if more { - watches.deltaMuxedResponses <- resp - } - }() - } + watches.mu.Unlock() } } } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index d9df5c32dc..7de1380022 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -1,6 +1,8 @@ package delta import ( + "sync" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" @@ -12,6 +14,8 @@ type watches struct { // Opaque resources share a muxed channel deltaMuxedResponses chan cache.DeltaResponse + + mu sync.RWMutex } // newWatches creates and initializes watches. @@ -28,36 +32,21 @@ func (w *watches) Cancel() { for _, watch := range w.deltaWatches { watch.Cancel() } + close(w.deltaMuxedResponses) } // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - useSharedChan bool // is this watch using a shared channel - cancel func() - nonce string + responses chan cache.DeltaResponse + cancel func() + nonce string state stream.StreamState } -func (w *watch) MakeResponseChan() { - w.responses = make(chan cache.DeltaResponse, 1) - w.useSharedChan = false -} - -func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) { - w.responses = sharedChan - w.useSharedChan = true -} - // Cancel calls terminate and cancel func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil && !w.useSharedChan { - // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here - // This is needed to release resources taken by goroutines watching this channel - close(w.responses) - } } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 876f275f87..f8429f2997 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,7 +15,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -346,9 +345,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) { resp.recv <- r } - // We create the server with the optional ordered ADS flag so we guarantee resource - // ordering over the stream. - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS()) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) assert.NoError(t, err)