From f7dfca16d0ab6da2033e9aaa50ffa9bf854e4ff4 Mon Sep 17 00:00:00 2001 From: huabing zhao Date: Fri, 8 Sep 2023 17:58:35 +0800 Subject: [PATCH] make sure responses are processed prior to new requests to avoid deadlock Signed-off-by: huabing zhao --- pkg/server/delta/v3/server.go | 39 ++++++++--------------------- pkg/server/delta/v3/watches.go | 24 +++--------------- pkg/server/delta/v3/watches_test.go | 8 ------ pkg/server/v3/delta_test.go | 3 +-- 4 files changed, 15 insertions(+), 59 deletions(-) diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 395ff3865c..74d8af57b4 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -37,13 +37,6 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} -// WithOrderedADS enables the internal flag to order responses strictly. -func WithOrderedADS() config.XDSOption { - return func(o *config.Opts) { - o.Ordered = true - } -} - type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -72,7 +65,7 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba return s } -func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { +func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { streamID := atomic.AddInt64(&s.streamCount, 1) // streamNonce holds a unique nonce for req-resp pairs per xDS stream. @@ -149,6 +142,14 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De return status.Errorf(codes.Unavailable, "empty request") } + // make sure responses are processed prior to new requests to avoid deadlock + if len(watches.deltaMuxedResponses) > 0 { + go func() { + reqCh <- req + }() + break + } + if s.callbacks != nil { if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil { return err @@ -163,15 +164,11 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De req.Node = node } - ordered := false // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if s.opts.Ordered { - ordered = true - } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -195,25 +192,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - if ordered { - // Use the shared channel to keep the order of responses. - watch.UseSharedResponseChan(watches.deltaMuxedResponses) - } else { - watch.MakeResponseChan() - } + watch.responses = watches.deltaMuxedResponses watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) watches.deltaWatches[typeURL] = watch - - // just handle normal non-ordered responses here - // all ordered responses are sent to the muxedResponses channel directly - if !watch.useSharedChan { - go func() { - resp, more := <-watch.responses - if more { - watches.deltaMuxedResponses <- resp - } - }() - } } } } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index d9df5c32dc..839d323211 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -19,7 +19,7 @@ func newWatches() watches { // deltaMuxedResponses needs a buffer to release go-routines populating it return watches{ deltaWatches: make(map[string]watch, int(types.UnknownType)), - deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)), + deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2), } } @@ -32,32 +32,16 @@ func (w *watches) Cancel() { // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - useSharedChan bool // is this watch using a shared channel - cancel func() - nonce string + responses chan cache.DeltaResponse + cancel func() + nonce string state stream.StreamState } -func (w *watch) MakeResponseChan() { - w.responses = make(chan cache.DeltaResponse, 1) - w.useSharedChan = false -} - -func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) { - w.responses = sharedChan - w.useSharedChan = true -} - // Cancel calls terminate and cancel func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil && !w.useSharedChan { - // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here - // This is needed to release resources taken by goroutines watching this channel - close(w.responses) - } } diff --git a/pkg/server/delta/v3/watches_test.go b/pkg/server/delta/v3/watches_test.go index 104b979be3..6113498707 100644 --- a/pkg/server/delta/v3/watches_test.go +++ b/pkg/server/delta/v3/watches_test.go @@ -30,13 +30,5 @@ func TestDeltaWatches(t *testing.T) { watches.Cancel() assert.Equal(t, 3, cancelCount) - for _, channel := range channels { - select { - case _, ok := <-channel: - assert.False(t, ok, "a channel was not closed") - default: - assert.Fail(t, "a channel was not closed") - } - } }) } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 876f275f87..870b0a85fd 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,7 +15,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -348,7 +347,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) { // We create the server with the optional ordered ADS flag so we guarantee resource // ordering over the stream. - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS()) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) assert.NoError(t, err)