Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add support for multiple xDS clients, for fallback #7347

Merged
merged 7 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,43 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string)
return stream
}

// Tests CSDS functionality. The test performs the following:
// - Spins up a management server and creates two xDS clients talking to it.
// - Registers a set of watches on the xDS clients, and verifies that the CSDS
// response reports resources in REQUESTED state.
// - Configures resources on the management server corresponding to the ones
// being watched by the clients, and verifies that the CSDS response reports
// resources in ACKED state.
// - Modifies resources on the management server such that some of them are
// expected to be NACKed by the client. Verifies that the CSDS response
// contains some resources in ACKED state and some in NACKED state.
//
// For all of the above operations, the test also verifies that the client_scope
// field in the CSDS response is populated appropriately.
func (s) TestCSDS(t *testing.T) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// Spin up a xDS management server on a local port.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

// Create a bootstrap file in a temporary directory.
// Create a bootstrap contents pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bootstrapContents)

// Create two xDS clients, with different names. These should end up
// creating two different xDS clients.
const xdsClient1Name = "xds-csds-client-1"
xdsClient1, xdsClose1, err := xdsclient.New(xdsClient1Name)
xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: xdsClient1Name,
Contents: bootstrapContents,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsClose1()
const xdsClient2Name = "xds-csds-client-2"
xdsClient2, xdsClose2, err := xdsclient.New(xdsClient2Name)
xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: xdsClient2Name,
Contents: bootstrapContents,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down Expand Up @@ -373,6 +391,13 @@ func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.Client
}
}

// Repeatedly sends CSDS requests and receives CSDS responses on the provided
// stream and verifies that the response matches `want`. Returns an error if
// sending or receiving on the stream fails, or if the context expires before a
// response matching `want` is received.
//
// Expects client configs in `want` to be sorted on `client_scope` and the
// resource dump to be sorted on type_url and resource name.
func checkClientStatusResponse(ctx context.Context, stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want *v3statuspb.ClientStatusResponse) error {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
var cmpOpts = cmp.Options{
protocmp.Transform(),
Expand Down
15 changes: 8 additions & 7 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// NameForServer represents the value to be passed as name when creating an xDS
// client from xDS-enabled gRPC servers. This is a well-known dedicated key
// value, and is defined in gRFC A71.
const NameForServer = "#server"

// New returns a new xDS client configured by the bootstrap file specified in env
// variable GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG.
//
// gRPC client implementations are expected to pass the channel's target URI for
// the name field, while server implementations are expected to pass a dedicated
// well-known value. The returned client is a reference counted xDS client
// implementation shared across callers using the same name and bootstrap
// configuration.
// well-known value "#server", as specified in gRFC A71. The returned client is
// a reference counted implementation shared among callers using the same name.
//
// The second return value represents a close function which releases the
// caller's reference on the returned client. The caller is expected to invoke
Expand Down Expand Up @@ -82,10 +86,7 @@
// OptionsForTesting contains options to configure xDS client creation for
// testing purposes only.
type OptionsForTesting struct {
// Name is a unique name for this xDS client. For client channels, this
// should be the user's dial target, and for servers, this should be a
// dedicated well-known value. TODO(easwars): Point to where this is
// defined.
// Name is a unique name for this xDS client.
Name string
// Contents contain a JSON representation of the bootstrap configuration to
// be used when creating the xDS client.
Expand All @@ -111,7 +112,7 @@
// This function should ONLY be used for testing purposes.
func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.Name == "" {
return nil, nil, fmt.Errorf("opts.Name field must be non-empty")

Check warning on line 115 in xds/internal/xdsclient/client_new.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/client_new.go#L115

Added line #L115 was not covered by tests
}
if opts.WatchExpiryTimeout == 0 {
opts.WatchExpiryTimeout = defaultWatchExpiryTimeout
Expand Down Expand Up @@ -142,7 +143,7 @@

c, ok := clients[name]
if !ok {
return nil, nil, fmt.Errorf("xDS client with name %q not found", name)

Check warning on line 146 in xds/internal/xdsclient/client_new.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/client_new.go#L146

Added line #L146 was not covered by tests
}
c.incrRef()
return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil
Expand Down
13 changes: 8 additions & 5 deletions xds/internal/xdsclient/client_refcounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

client, ok := clients[name]
if !ok {
logger.Errorf("Attempt to close a non-existent xDS client with name %s", name)

Check warning on line 49 in xds/internal/xdsclient/client_refcounted.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/client_refcounted.go#L49

Added line #L49 was not covered by tests
return
}
if client.decrRef() != 0 {
Expand All @@ -58,10 +58,18 @@

}

// newRefCountedWithConfig creates a new reference counted xDS client
// implementation for name, if one does not exist already. The passed in
// fallback config is used when bootstrap environment variables are not defined.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "gets a reference to the existing xDS Client, or creates one if one does not exist already" or something alongside those lines for first context. To explicitly call out what happens if the xDS Client already exists for this name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func newRefCountedWithConfig(name string, fallbackConfig *bootstrap.Config, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top level comment of get or create if not present? Or change name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as for the Close, is the reason to keep these names this happens implicitly with objects that are ref counted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained the reasoning in the comment for close.

clientsMu.Lock()
defer clientsMu.Unlock()

if c := clients[name]; c != nil {
c.incrRef()
return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil
}

// Use fallbackConfig only if bootstrap env vars are unspecified.
var config *bootstrap.Config
if envconfig.XDSBootstrapFileName == "" && envconfig.XDSBootstrapFileContent == "" {
Expand All @@ -77,11 +85,6 @@
}
}

if c := clients[name]; c != nil {
c.incrRef()
return c, grpcsync.OnceFunc(func() { clientRefCountedClose(name) }), nil
}

// Create the new client implementation.
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout)
if err != nil {
Expand Down
177 changes: 155 additions & 22 deletions xds/internal/xdsclient/client_refcounted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,70 +20,72 @@ package xdsclient

import (
"context"
"sync"
"testing"

"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
)

// Test that multiple New() returns the same Client. And only when the last
// client is closed, the underlying client is closed.
func (s) TestClientNewSingleton(t *testing.T) {
// Tests that multiple calls to New() with the same name returns the same
// client. Also verifies that only when all references to the newly created
// client are released, the underlying client is closed.
func (s) TestClientNew_Single(t *testing.T) {
// Create a bootstrap configuration, place it in a file in the temp
// directory, and set the bootstrap env vars to point to it.
nodeID := uuid.New().String()
contents := e2e.DefaultBootstrapContents(t, nodeID, "non-existent-server-address")
testutils.CreateBootstrapFileForTesting(t, contents)

// Override the singleton creation hook to get notified.
// Override the client creation hook to get notified.
origClientImplCreateHook := xdsClientImplCreateHook
clientImplCreateCh := testutils.NewChannel()
xdsClientImplCreateHook = func(name string) {
clientImplCreateCh.Replace(name)
}
defer func() { xdsClientImplCreateHook = origClientImplCreateHook }()

// Override the singleton close hook to get notified.
// Override the client close hook to get notified.
origClientImplCloseHook := xdsClientImplCloseHook
clientImplCloseCh := testutils.NewChannel()
xdsClientImplCloseHook = func(name string) {
clientImplCloseCh.Replace(name)
}
defer func() { xdsClientImplCloseHook = origClientImplCloseHook }()

// The first call to New() should create a new singleton client.
_, closeFunc, err := New("singleton") // TODO(easwars): fix this
// The first call to New() should create a new client.
_, closeFunc, err := New(t.Name())
if err != nil {
t.Fatalf("failed to create xDS client: %v", err)
t.Fatalf("Failed to create xDS client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := clientImplCreateCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err)
t.Fatalf("Timeout when waiting for xDS client to be created: %v", err)
}

// Calling New() again should not create new singleton client implementations.
// Calling New() again should not create new client implementations.
const count = 9
closeFuncs := make([]func(), 9)
closeFuncs := make([]func(), count)
for i := 0; i < count; i++ {
func() {
_, closeFuncs[i], err = New("singleton") // TODO(easwars): fix this
_, closeFuncs[i], err = New(t.Name())
if err != nil {
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
}

sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := clientImplCreateCh.Receive(sCtx); err == nil {
t.Fatalf("%d-th call to New() created a new singleton client", i)
t.Fatalf("%d-th call to New() created a new client", i)
}
}()
}

// Call Close() multiple times on each of the clients created in the above for
// loop. Close() calls are idempotent, and the underlying client
// Call Close() multiple times on each of the clients created in the above
// for loop. Close() calls are idempotent, and the underlying client
// implementation will not be closed until we release the first reference we
// acquired above, via the first call to New().
for i := 0; i < count; i++ {
Expand All @@ -94,25 +96,156 @@ func (s) TestClientNewSingleton(t *testing.T) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := clientImplCloseCh.Receive(sCtx); err == nil {
t.Fatal("singleton client implementation closed before all references are released")
t.Fatal("Client implementation closed before all references are released")
}
}()
}

// Call the last Close(). The underlying implementation should be closed.
closeFunc()
if _, err := clientImplCloseCh.Receive(ctx); err != nil {
t.Fatalf("Timeout waiting for singleton client implementation to be closed: %v", err)
t.Fatalf("Timeout waiting for client implementation to be closed: %v", err)
}

// Calling New() again, after the previous Client was actually closed, should
// create a new one.
_, closeFunc, err = New("singleton") // TODO(easwars): fix this
// Calling New() again, after the previous Client was actually closed,
// should create a new one.
_, closeFunc, err = New(t.Name())
if err != nil {
t.Fatalf("failed to create client: %v", err)
t.Fatalf("Failed to create xDS client: %v", err)
}
defer closeFunc()
if _, err := clientImplCreateCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err)
t.Fatalf("Timeout when waiting for xDS client to be created: %v", err)
}
}

// Tests the scenario where there are multiple calls to New() with different
// names. Verifies that reference counts are tracked correctly and that only
// when all references are released, the client is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: switch "are released, the client is closed" to something that emphasizes the plurality of the clients maybe "are released for a client, only that specific client is closed" or something along those lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (s) TestClientNew_Multiple(t *testing.T) {
// Create a bootstrap configuration, place it in a file in the temp
// directory, and set the bootstrap env vars to point to it.
nodeID := uuid.New().String()
contents := e2e.DefaultBootstrapContents(t, nodeID, "non-existent-server-address")
testutils.CreateBootstrapFileForTesting(t, contents)

// Override the client creation hook to get notified.
origClientImplCreateHook := xdsClientImplCreateHook
clientImplCreateCh := testutils.NewChannel()
xdsClientImplCreateHook = func(name string) {
clientImplCreateCh.Replace(name)
}
defer func() { xdsClientImplCreateHook = origClientImplCreateHook }()

// Override the client close hook to get notified.
origClientImplCloseHook := xdsClientImplCloseHook
clientImplCloseCh := testutils.NewChannel()
xdsClientImplCloseHook = func(name string) {
clientImplCloseCh.Replace(name)
}
defer func() { xdsClientImplCloseHook = origClientImplCloseHook }()

// Create two xDS clients.
client1Name := t.Name() + "-1"
_, closeFunc1, err := New(client1Name)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := clientImplCreateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for xDS client to be created: %v", err)
}
if name.(string) != client1Name {
t.Fatalf("xDS client created for name %q, want %q", name.(string), client1Name)
}

client2Name := t.Name() + "-2"
_, closeFunc2, err := New(client2Name)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
name, err = clientImplCreateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for xDS client to be created: %v", err)
}
if name.(string) != client2Name {
t.Fatalf("xDS client created for name %q, want %q", name.(string), client1Name)
}

// Create N more references to each of these clients.
const count = 9
closeFuncs1 := make([]func(), count)
closeFuncs2 := make([]func(), count)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs1[i], err = New(client1Name)
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
var err error
_, closeFuncs2[i], err = New(client2Name)
if err != nil {
t.Errorf("%d-th call to New() failed with error: %v", i, err)
}
}
}()
wg.Wait()
if t.Failed() {
t.FailNow()
}

// Ensure that none of the create hooks are called.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := clientImplCreateCh.Receive(sCtx); err == nil {
t.Fatalf("New xDS client created when expected to reuse an existing one")
}

// Close the first client completely.
closeFunc1()
for i := 0; i < count; i++ {
closeFuncs1[i]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: here and the close for 2: the test above tests the idempotency implemented through the once func. Maybe call some of these twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

}

// Ensure that the close hook is called for the first client.
name, err = clientImplCloseCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for xDS client to be closed completely")
}
if name.(string) != client1Name {
t.Fatalf("xDS client closed for name %q, want %q", name.(string), client1Name)
}

// Ensure that the close hook is not called for the second client.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := clientImplCloseCh.Receive(sCtx); err == nil {
t.Fatal("Client implementation closed before all references are released")
}

// Close the second client completely.
closeFunc2()
for i := 0; i < count; i++ {
closeFuncs2[i]()
}

// Ensure that the close hook is called for the second client.
name, err = clientImplCloseCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for xDS client to be closed completely")
}
if name.(string) != client2Name {
t.Fatalf("xDS client closed for name %q, want %q", name.(string), client2Name)
}
}
zasweq marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 1 addition & 2 deletions xds/internal/xdsclient/clientimpl_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func (c *clientImpl) dumpResources() *v3statuspb.ClientConfig {
}
}

// DumpResources returns the status and contents of all xDS resources, in a
// ClientStatusResponse message.
// DumpResources returns the status and contents of all xDS resources.
func DumpResources() *v3statuspb.ClientStatusResponse {
clientsMu.Lock()
defer clientsMu.Unlock()
Expand Down
Loading
Loading