Skip to content

Commit

Permalink
handle responses in a seperate go routine to avoid deadlock
Browse files Browse the repository at this point in the history
Signed-off-by: huabing zhao <zhaohuabing@gmail.com>
  • Loading branch information
zhaohuabing committed Aug 29, 2023
1 parent 1fd388b commit f1a3989
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 113 deletions.
88 changes: 40 additions & 48 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package delta
import (
"context"
"errors"
"log"
"strconv"
"sync/atomic"

Expand Down Expand Up @@ -37,13 +38,6 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithOrderedADS enables the internal flag to order responses strictly.
func WithOrderedADS() config.XDSOption {
return func(o *config.Opts) {
o.Ordered = true
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand Down Expand Up @@ -84,7 +78,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
var node = &core.Node{}

defer func() {
watches.mu.Lock()
watches.Cancel()
watches.mu.Unlock()
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID, node)
}
Expand Down Expand Up @@ -116,30 +112,44 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}

for {
select {
case <-s.ctx.Done():
return nil
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
done := make(chan struct{}, 1)
go func() {
for {
select {
case <-done:
return
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}

typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
log.Printf("%s watch failed\n", typ)
break
}

nonce, err := send(resp)
if err != nil {
return err
}
nonce, err := send(resp)
if err != nil {
log.Printf("failed to send response %+v\n", err)
break
}

watch := watches.deltaWatches[typ]
watch.nonce = nonce
watches.mu.Lock()
watch := watches.deltaWatches[typ]
watch.nonce = nonce
watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
watches.mu.Unlock()
}
}
}()

watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
for {
select {
case <-s.ctx.Done():
done <- struct{}{}
return nil
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
Expand All @@ -163,22 +173,19 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
req.Node = node
}

ordered := false
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
if s.opts.Ordered {
ordered = true
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}

typeURL := req.GetTypeUrl()

// cancel existing watch to (re-)request a newer version
watches.mu.Lock()
watch, ok := watches.deltaWatches[typeURL]
if !ok {
// Initialize the state of the stream.
Expand All @@ -195,25 +202,10 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

if ordered {
// Use the shared channel to keep the order of responses.
watch.UseSharedResponseChan(watches.deltaMuxedResponses)
} else {
watch.MakeResponseChan()
}
watch.responses = watches.deltaMuxedResponses
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch

// just handle normal non-ordered responses here
// all ordered responses are sent to the muxedResponses channel directly
if !watch.useSharedChan {
go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
watches.mu.Unlock()
}
}
}
Expand Down
27 changes: 8 additions & 19 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package delta

import (
"sync"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
Expand All @@ -12,6 +14,8 @@ type watches struct {

// Opaque resources share a muxed channel
deltaMuxedResponses chan cache.DeltaResponse

mu sync.RWMutex
}

// newWatches creates and initializes watches.
Expand All @@ -28,36 +32,21 @@ func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}
close(w.deltaMuxedResponses)
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
useSharedChan bool // is this watch using a shared channel
cancel func()
nonce string
responses chan cache.DeltaResponse
cancel func()
nonce string

state stream.StreamState
}

func (w *watch) MakeResponseChan() {
w.responses = make(chan cache.DeltaResponse, 1)
w.useSharedChan = false
}

func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) {
w.responses = sharedChan
w.useSharedChan = true
}

// Cancel calls terminate and cancel
func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil && !w.useSharedChan {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}
42 changes: 0 additions & 42 deletions pkg/server/delta/v3/watches_test.go

This file was deleted.

5 changes: 1 addition & 4 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
Expand Down Expand Up @@ -346,9 +345,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
resp.recv <- r
}

// We create the server with the optional ordered ADS flag so we guarantee resource
// ordering over the stream.
s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS())
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
Expand Down

0 comments on commit f1a3989

Please sign in to comment.