Skip to content

Commit

Permalink
add WithDistinctResourceTypes server option to override the channel c…
Browse files Browse the repository at this point in the history
…apacity when Config consists of the greater number of resource types

Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
  • Loading branch information
lobkovilya committed Feb 8, 2024
1 parent bac4579 commit 9b86535
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 8 deletions.
8 changes: 6 additions & 2 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package config

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"

// Opts for individual xDS implementations that can be
// utilized through the functional opts pattern.
type Opts struct {
// If true respond to ADS requests with a guaranteed resource ordering
Ordered bool
Ordered bool
DistinctResourceTypes int
}

func NewOpts() Opts {
return Opts{
Ordered: false,
Ordered: false,
DistinctResourceTypes: int(types.UnknownType),
}
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithDistinctResourceTypes overrides the default number of resource types, so that the server can serve Configs with
// more distinct resource types without getting into a deadlock.
func WithDistinctResourceTypes(n int) config.XDSOption {
return func(o *config.Opts) {
o.DistinctResourceTypes = n
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand Down Expand Up @@ -72,7 +80,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
var streamNonce int64

// a collection of stack allocated watches per request type
watches := newWatches()
watches := newWatches(s.opts.DistinctResourceTypes)

node := &core.Node{}

Expand Down
7 changes: 3 additions & 4 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package delta

import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
Expand All @@ -15,15 +14,15 @@ type watches struct {
}

// newWatches creates and initializes watches.
func newWatches() watches {
func newWatches(distinctResourceTypes int) watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
deltaWatches: make(map[string]watch, distinctResourceTypes),
deltaMuxedResponses: make(chan cache.DeltaResponse, distinctResourceTypes*2),
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) {
watches := newWatches()
watches := newWatches(int(types.UnknownType))

cancelCount := 0
// create a few watches, and ensure that the cancel function are called and the channels are closed
Expand Down

0 comments on commit 9b86535

Please sign in to comment.