Skip to content

Commit

Permalink
Option for the delta server to override the channel capacity and avoi…
Browse files Browse the repository at this point in the history
…d deadlocks (#3)

* add WithDistinctResourceTypes server option to override the channel capacity when Config consists of the greater number of resource types

Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored and lukidzi committed Nov 8, 2024
1 parent 03ea0b2 commit b4bceef
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -x
# Needed to avoid issues with go version stamping in CI build
git config --global --add safe.directory /go-control-plane

go install golang.org/x/tools/cmd/goimports@latest
go install golang.org/x/tools/cmd/goimports@v0.24.0

cd /go-control-plane

Expand Down
8 changes: 6 additions & 2 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package config

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"

// Opts for individual xDS implementations that can be
// utilized through the functional opts pattern.
type Opts struct {
// If true respond to ADS requests with a guaranteed resource ordering
Ordered bool
Ordered bool
DistinctResourceTypes int
}

func NewOpts() Opts {
return Opts{
Ordered: false,
Ordered: false,
DistinctResourceTypes: int(types.UnknownType),
}
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithDistinctResourceTypes overrides the default number of resource types, so that the server can serve Configs with
// more distinct resource types without getting into a deadlock.
func WithDistinctResourceTypes(n int) config.XDSOption {
return func(o *config.Opts) {
o.DistinctResourceTypes = n
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand All @@ -50,11 +58,12 @@ type server struct {
}

// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
func NewServer(ctx context.Context, configWatcher cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
s := &server{
cache: config,
cache: configWatcher,
callbacks: callbacks,
ctx: ctx,
opts: config.NewOpts(),
}

// Parse through our options
Expand All @@ -72,7 +81,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
var streamNonce int64

// a collection of stack allocated watches per request type
watches := newWatches()
watches := newWatches(s.opts.DistinctResourceTypes)

node := &core.Node{}

Expand Down
7 changes: 3 additions & 4 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package delta

import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
Expand All @@ -15,15 +14,15 @@ type watches struct {
}

// newWatches creates and initializes watches.
func newWatches() watches {
func newWatches(distinctResourceTypes int) watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
deltaWatches: make(map[string]watch, distinctResourceTypes),
deltaMuxedResponses: make(chan cache.DeltaResponse, distinctResourceTypes*2),
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) {
watches := newWatches()
watches := newWatches(int(types.UnknownType))

cancelCount := 0
// create a few watches, and ensure that the cancel function are called and the channels are closed
Expand Down

0 comments on commit b4bceef

Please sign in to comment.