Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option for the delta server to override the channel capacity and avoid deadlocks #3

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -x
# Needed to avoid issues with go version stamping in CI build
git config --global --add safe.directory /go-control-plane

go install golang.org/x/tools/cmd/goimports@latest
go install golang.org/x/tools/cmd/goimports@v0.24.0

cd /go-control-plane

Expand Down
8 changes: 6 additions & 2 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package config

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"

// Opts for individual xDS implementations that can be
// utilized through the functional opts pattern.
type Opts struct {
// If true respond to ADS requests with a guaranteed resource ordering
Ordered bool
Ordered bool
DistinctResourceTypes int
}

func NewOpts() Opts {
return Opts{
Ordered: false,
Ordered: false,
DistinctResourceTypes: int(types.UnknownType),
}
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithDistinctResourceTypes overrides the default number of resource types, so that the server can serve Configs with
// more distinct resource types without getting into a deadlock.
func WithDistinctResourceTypes(n int) config.XDSOption {
return func(o *config.Opts) {
o.DistinctResourceTypes = n
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand All @@ -50,11 +58,12 @@ type server struct {
}

// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
func NewServer(ctx context.Context, configWatcher cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
s := &server{
cache: config,
cache: configWatcher,
callbacks: callbacks,
ctx: ctx,
opts: config.NewOpts(),
}

// Parse through our options
Expand All @@ -72,7 +81,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
var streamNonce int64

// a collection of stack allocated watches per request type
watches := newWatches()
watches := newWatches(s.opts.DistinctResourceTypes)

node := &core.Node{}

Expand Down
7 changes: 3 additions & 4 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package delta

import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
Expand All @@ -15,15 +14,15 @@ type watches struct {
}

// newWatches creates and initializes watches.
func newWatches() watches {
func newWatches(distinctResourceTypes int) watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
deltaWatches: make(map[string]watch, distinctResourceTypes),
deltaMuxedResponses: make(chan cache.DeltaResponse, distinctResourceTypes*2),
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) {
watches := newWatches()
watches := newWatches(int(types.UnknownType))

cancelCount := 0
// create a few watches, and ensure that the cancel function are called and the channels are closed
Expand Down