diff --git a/internal/xds/bootstrap.go b/internal/xds/bootstrap.go index a3f80d8f2496..1d74ab46a114 100644 --- a/internal/xds/bootstrap.go +++ b/internal/xds/bootstrap.go @@ -65,11 +65,32 @@ type BootstrapOptions struct { // completed successfully. It is the responsibility of the caller to invoke the // cleanup function at the end of the test. func SetupBootstrapFile(opts BootstrapOptions) (func(), error) { + bootstrapContents, err := BootstrapContents(opts) + if err != nil { + return nil, err + } f, err := ioutil.TempFile("", "test_xds_bootstrap_*") if err != nil { return nil, fmt.Errorf("failed to created bootstrap file: %v", err) } + if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil { + return nil, fmt.Errorf("failed to created bootstrap file: %v", err) + } + logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents) + + origBootstrapFileName := env.BootstrapFileName + env.BootstrapFileName = f.Name() + return func() { + os.Remove(f.Name()) + env.BootstrapFileName = origBootstrapFileName + }, nil +} + +// BootstrapContents returns the contents to go into a bootstrap file, +// environment, or configuration passed to +// xds.NewXDSResolverWithConfigForTesting. +func BootstrapContents(opts BootstrapOptions) ([]byte, error) { cfg := &bootstrapConfig{ XdsServers: []server{ { @@ -100,17 +121,7 @@ func SetupBootstrapFile(opts BootstrapOptions) (func(), error) { if err != nil { return nil, fmt.Errorf("failed to created bootstrap file: %v", err) } - if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil { - return nil, fmt.Errorf("failed to created bootstrap file: %v", err) - } - logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents) - - origBootstrapFileName := env.BootstrapFileName - env.BootstrapFileName = f.Name() - return func() { - os.Remove(f.Name()) - env.BootstrapFileName = origBootstrapFileName - }, nil + return bootstrapContents, nil } type bootstrapConfig struct { diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 0e8b83481ac9..0b5f9bc08003 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -59,7 +59,7 @@ var ( // not deal with subConns. return builder.Build(cc, opts), nil } - newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } + newXDSClient func() (xdsClientInterface, error) buildProvider = buildProviderFunc ) @@ -86,12 +86,15 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer. b.logger = prefixLogger((b)) b.logger.Infof("Created") - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil + if newXDSClient != nil { + // For tests + client, err := newXDSClient() + if err != nil { + b.logger.Errorf("failed to create xds-client: %v", err) + return nil + } + b.xdsClient = client } - b.xdsClient = client var creds credentials.TransportCredentials switch { @@ -359,7 +362,12 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { lbCfg.LrsLoadReportingServerName = new(string) } + resolverState := resolver.State{} + if c, ok := b.xdsClient.(*xdsclient.Client); ok { + resolverState = xdsclient.SetClient(resolverState, c) + } ccState := balancer.ClientConnState{ + ResolverState: resolverState, BalancerConfig: lbCfg, } if err := b.edsLB.UpdateClientConnState(ccState); err != nil { @@ -397,7 +405,9 @@ func (b *cdsBalancer) run() { b.edsLB.Close() b.edsLB = nil } - b.xdsClient.Close() + if newXDSClient != nil { + b.xdsClient.Close() + } if b.cachedRoot != nil { b.cachedRoot.Close() } @@ -468,6 +478,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro return errBalancerClosed } + if b.xdsClient == nil { + c := xdsclient.FromResolverState(state.ResolverState) + if c == nil { + return balancer.ErrBadResolverState + } + b.xdsClient = c + } + b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig)) // The errors checked here should ideally never happen because the // ServiceConfig in this case is prepared by the xdsResolver and is not diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 4bd29901d760..4fca0f387e47 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -52,7 +52,7 @@ func init() { balancer.Register(clusterImplBB{}) } -var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } +var newXDSClient func() (xdsClientInterface, error) type clusterImplBB struct{} @@ -67,12 +67,15 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) } b.logger = prefixLogger(b) - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil + if newXDSClient != nil { + // For tests + client, err := newXDSClient() + if err != nil { + b.logger.Errorf("failed to create xds-client: %v", err) + return nil + } + b.xdsC = client } - b.xdsC = client go b.run() b.logger.Infof("Created") @@ -204,6 +207,14 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) } + if cib.xdsC == nil { + c := xdsclient.FromResolverState(s.ResolverState) + if c == nil { + return balancer.ErrBadResolverState + } + cib.xdsC = c + } + // Update load reporting config. This needs to be done before updating the // child policy because we need the loadStore from the updated client to be // passed to the ccWrapper, so that the next picker from the child policy @@ -315,7 +326,9 @@ func (cib *clusterImplBalancer) Close() { cib.childLB.Close() cib.childLB = nil } - cib.xdsC.Close() + if newXDSClient != nil { + cib.xdsC.Close() + } cib.logger.Infof("Shutdown") } diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 98b1dbaedd46..21e3f43be74e 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -53,7 +53,7 @@ var ( newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger) } - newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } + newXDSClient func() (xdsClientInterface, error) ) func init() { @@ -76,13 +76,16 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp } x.logger = prefixLogger(x) - client, err := newXDSClient() - if err != nil { - x.logger.Errorf("xds: failed to create xds-client: %v", err) - return nil + if newXDSClient != nil { + // For tests + client, err := newXDSClient() + if err != nil { + x.logger.Errorf("xds: failed to create xds-client: %v", err) + return nil + } + x.xdsClient = client } - x.xdsClient = client x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger) x.logger.Infof("Created") go x.run() @@ -172,7 +175,9 @@ func (x *edsBalancer) run() { x.edsImpl.updateState(u.priority, u.s) case <-x.closed.Done(): x.cancelWatch() - x.xdsClient.Close() + if newXDSClient != nil { + x.xdsClient.Close() + } x.edsImpl.close() x.logger.Infof("Shutdown") x.done.Fire() @@ -380,6 +385,14 @@ func (x *edsBalancer) ResolverError(err error) { } func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + if x.xdsClient == nil { + c := xdsclient.FromResolverState(s.ResolverState) + if c == nil { + return balancer.ErrBadResolverState + } + x.xdsClient = c + } + select { case x.grpcUpdate <- &s: case <-x.closed.Done(): diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 0642c54ed111..5b044f480345 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -36,7 +36,7 @@ func init() { balancer.Register(&lrsBB{}) } -var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } +var newXDSClient func() (xdsClientInterface, error) // Name is the name of the LRS balancer. const Name = "lrs_experimental" @@ -51,12 +51,15 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc b.logger = prefixLogger(b) b.logger.Infof("Created") - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil + if newXDSClient != nil { + // For tests + client, err := newXDSClient() + if err != nil { + b.logger.Errorf("failed to create xds-client: %v", err) + return nil + } + b.client = newXDSClientWrapper(client) } - b.client = newXDSClientWrapper(client) return b } @@ -87,6 +90,14 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } + if b.client == nil { + c := xdsclient.FromResolverState(s.ResolverState) + if c == nil { + return balancer.ErrBadResolverState + } + b.client = newXDSClientWrapper(c) + } + // Update load reporting config or xds client. This needs to be done before // updating the child policy because we need the loadStore from the updated // client to be passed to the ccWrapper. @@ -245,5 +256,7 @@ func (w *xdsClientWrapper) close() { w.cancelLoadReport() w.cancelLoadReport = nil } - w.c.Close() + if newXDSClient != nil { + w.c.Close() + } } diff --git a/xds/internal/client/attributes.go b/xds/internal/client/attributes.go new file mode 100644 index 000000000000..50b988245291 --- /dev/null +++ b/xds/internal/client/attributes.go @@ -0,0 +1,36 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import "google.golang.org/grpc/resolver" + +type clientKeyType string + +const clientKey = clientKeyType("grpc.xds.internal.client.Client") + +// FromResolverState returns the Client from state, or nil if not present. +func FromResolverState(state resolver.State) *Client { + cs, _ := state.Attributes.Value(clientKey).(*Client) + return cs +} + +// SetClient sets c in state and returns the new state. +func SetClient(state resolver.State, c *Client) resolver.State { + state.Attributes = state.Attributes.WithValues(clientKey, c) + return state +} diff --git a/xds/internal/client/bootstrap/bootstrap.go b/xds/internal/client/bootstrap/bootstrap.go index a3fb5c0816b4..dcf030631603 100644 --- a/xds/internal/client/bootstrap/bootstrap.go +++ b/xds/internal/client/bootstrap/bootstrap.go @@ -160,13 +160,19 @@ func bootstrapConfigFromEnvVariable() ([]byte, error) { // fields left unspecified, in which case the caller should use some sane // defaults. func NewConfig() (*Config, error) { - config := &Config{} - data, err := bootstrapConfigFromEnvVariable() if err != nil { return nil, fmt.Errorf("xds: Failed to read bootstrap config: %v", err) } logger.Debugf("Bootstrap content: %s", data) + return NewConfigFromContents(data) +} + +// NewConfigFromContents returns a new Config using the specified bootstrap +// file contents instead of reading the environment variable. This is only +// suitable for testing purposes. +func NewConfigFromContents(data []byte) (*Config, error) { + config := &Config{} var jsonData map[string]json.RawMessage if err := json.Unmarshal(data, &jsonData); err != nil { diff --git a/xds/internal/client/singleton.go b/xds/internal/client/singleton.go index 99f195341acd..41dd16e26afe 100644 --- a/xds/internal/client/singleton.go +++ b/xds/internal/client/singleton.go @@ -19,6 +19,8 @@ package client import ( + "bytes" + "encoding/json" "fmt" "sync" "time" @@ -92,8 +94,8 @@ func New() (*Client, error) { // singleton. The following calls will return the singleton xds client without // checking or using the config. // -// This function is internal only, for c2p resolver to use. DO NOT use this -// elsewhere. Use New() instead. +// This function is internal only, for c2p resolver and testing to use. DO NOT +// use this elsewhere. Use New() instead. func NewWithConfig(config *bootstrap.Config) (*Client, error) { singletonClient.mu.Lock() defer singletonClient.mu.Unlock() @@ -141,3 +143,49 @@ func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.D } return &Client{clientImpl: cl, refCount: 1}, nil } + +// NewClientWithBootstrapContents returns an xds client for this config, +// separate from the global singleton. This should be used for testing +// purposes only. +func NewClientWithBootstrapContents(contents []byte) (*Client, error) { + // Normalize the contents + buf := bytes.Buffer{} + err := json.Indent(&buf, contents, "", "") + if err != nil { + return nil, fmt.Errorf("xds: error normalizing JSON: %v", err) + } + contents = bytes.TrimSpace(buf.Bytes()) + + clientsMu.Lock() + defer clientsMu.Unlock() + if c := clients[string(contents)]; c != nil { + c.mu.Lock() + // Since we don't remove the *Client from the map when it is closed, we + // need to recreate the impl if the ref count dropped to zero. + if c.refCount > 0 { + c.refCount++ + c.mu.Unlock() + return c, nil + } + c.mu.Unlock() + } + + bcfg, err := bootstrap.NewConfigFromContents(contents) + if err != nil { + return nil, fmt.Errorf("xds: error with bootstrap config: %v", err) + } + + cImpl, err := newWithConfig(bcfg, defaultWatchExpiryTimeout) + if err != nil { + return nil, err + } + + c := &Client{clientImpl: cImpl, refCount: 1} + clients[string(contents)] = c + return c, nil +} + +var ( + clients = map[string]*Client{} + clientsMu sync.Mutex +) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index e995fa9fa8fe..41e3899c285e 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -36,6 +36,17 @@ import ( const xdsScheme = "xds" +// NewBuilder creates a new xds resolver builder using a specific xds bootstrap +// config, so tests can use multiple xds clients in different ClientConns at +// the same time. +func NewBuilder(config []byte) (resolver.Builder, error) { + return &xdsResolverBuilder{ + newXDSClient: func() (xdsClientInterface, error) { + return xdsclient.NewClientWithBootstrapContents(config) + }, + }, nil +} + // For overriding in unittests. var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() } @@ -43,7 +54,9 @@ func init() { resolver.Register(&xdsResolverBuilder{}) } -type xdsResolverBuilder struct{} +type xdsResolverBuilder struct { + newXDSClient func() (xdsClientInterface, error) +} // Build helps implement the resolver.Builder interface. // @@ -60,6 +73,11 @@ func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, op r.logger = prefixLogger((r)) r.logger.Infof("Creating resolver for target: %+v", t) + newXDSClient := newXDSClient + if b.newXDSClient != nil { + newXDSClient = b.newXDSClient + } + client, err := newXDSClient() if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) @@ -178,6 +196,13 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { state := iresolver.SetConfigSelector(resolver.State{ ServiceConfig: r.cc.ParseServiceConfig(string(sc)), }, cs) + + // Include the xds client for the LB policies to use. For unit tests, + // r.client may not be a full *xdsclient.Client, but it will always be in + // production. + if c, ok := r.client.(*xdsclient.Client); ok { + state = xdsclient.SetClient(state, c) + } r.cc.UpdateState(state) return true } diff --git a/xds/internal/test/xds_client_integration_test.go b/xds/internal/test/xds_client_integration_test.go index 713331b325e0..e94b70c9fb64 100644 --- a/xds/internal/test/xds_client_integration_test.go +++ b/xds/internal/test/xds_client_integration_test.go @@ -68,7 +68,7 @@ func (s) TestClientSideXDS(t *testing.T) { port, cleanup := clientSetup(t) defer cleanup() - serviceName := xdsServiceName + "-client-side-xds" + const serviceName = "my-service-client-side-xds" resources := e2e.DefaultClientResources(e2e.ResourceParams{ DialTarget: serviceName, NodeID: xdsClientNodeID, @@ -81,7 +81,7 @@ func (s) TestClientSideXDS(t *testing.T) { } // Create a ClientConn and make a successful RPC. - cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolverBuilder)) if err != nil { t.Fatalf("failed to dial local test server: %v", err) } diff --git a/xds/internal/test/xds_integration_test.go b/xds/internal/test/xds_integration_test.go index a41fec929762..b66cdd59cafe 100644 --- a/xds/internal/test/xds_integration_test.go +++ b/xds/internal/test/xds_integration_test.go @@ -40,7 +40,9 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/internal/xds/env" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/testdata" + "google.golang.org/grpc/xds" "google.golang.org/grpc/xds/internal/testutils/e2e" xdsinternal "google.golang.org/grpc/internal/xds" @@ -71,8 +73,10 @@ func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, er var ( // Globals corresponding to the single instance of the xDS management server // which is spawned for all the tests in this package. - managementServer *e2e.ManagementServer - xdsClientNodeID string + managementServer *e2e.ManagementServer + xdsClientNodeID string + bootstrapContents []byte + xdsResolverBuilder resolver.Builder ) // TestMain sets up an xDS management server, runs all tests, and stops the @@ -158,30 +162,33 @@ func createClientTLSCredentials(t *testing.T) credentials.TransportCredentials { // - sets up the global variables which refer to this management server and the // nodeID to be used when talking to this management server. // -// Returns a function to be invoked by the caller to stop the management server. -func setupManagementServer() (func(), error) { +// Returns a function to be invoked by the caller to stop the management +// server. +func setupManagementServer() (cleanup func(), err error) { // Turn on the env var protection for client-side security. origClientSideSecurityEnvVar := env.ClientSideSecuritySupport env.ClientSideSecuritySupport = true // Spin up an xDS management server on a local port. - var err error managementServer, err = e2e.StartManagementServer() if err != nil { return nil, err } + defer func() { + if err != nil { + managementServer.Stop() + } + }() // Create a directory to hold certs and key files used on the server side. serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem") if err != nil { - managementServer.Stop() return nil, err } // Create a directory to hold certs and key files used on the client side. clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem") if err != nil { - managementServer.Stop() return nil, err } @@ -194,7 +201,7 @@ func setupManagementServer() (func(), error) { // Create a bootstrap file in a temporary directory. xdsClientNodeID = uuid.New().String() - bootstrapCleanup, err := xdsinternal.SetupBootstrapFile(xdsinternal.BootstrapOptions{ + bootstrapContents, err = xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{ Version: xdsinternal.TransportV3, NodeID: xdsClientNodeID, ServerURI: managementServer.Address, @@ -202,13 +209,15 @@ func setupManagementServer() (func(), error) { ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, }) if err != nil { - managementServer.Stop() + return nil, err + } + xdsResolverBuilder, err = xds.NewXDSResolverWithConfigForTesting(bootstrapContents) + if err != nil { return nil, err } return func() { managementServer.Stop() - bootstrapCleanup() env.ClientSideSecuritySupport = origClientSideSecurityEnvVar }, nil } diff --git a/xds/internal/test/xds_server_integration_test.go b/xds/internal/test/xds_server_integration_test.go index 6511a6134cf8..4bf13e9305be 100644 --- a/xds/internal/test/xds_server_integration_test.go +++ b/xds/internal/test/xds_server_integration_test.go @@ -46,8 +46,6 @@ const ( certFile = "cert.pem" keyFile = "key.pem" rootFile = "ca.pem" - - xdsServiceName = "my-service" ) // setupGRPCServer performs the following: @@ -70,7 +68,7 @@ func setupGRPCServer(t *testing.T) (net.Listener, func()) { } // Initialize an xDS-enabled gRPC server and register the stubServer on it. - server := xds.NewGRPCServer(grpc.Creds(creds)) + server := xds.NewGRPCServer(grpc.Creds(creds), xds.BootstrapContentsForTesting(bootstrapContents)) testpb.RegisterTestServiceServer(server, &testService{}) // Create a local listener and pass it to Serve(). @@ -123,7 +121,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) { if err != nil { t.Fatalf("failed to retrieve host and port of server: %v", err) } - serviceName := xdsServiceName + "-fallback" + const serviceName = "my-service-fallback" resources := e2e.DefaultClientResources(e2e.ResourceParams{ DialTarget: serviceName, NodeID: xdsClientNodeID, @@ -154,7 +152,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) { // Create a ClientConn with the xds scheme and make a successful RPC. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds)) + cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder)) if err != nil { t.Fatalf("failed to dial local test server: %v", err) } @@ -205,7 +203,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { // Create xDS resources to be consumed on the client side. This // includes the listener, route configuration, cluster (with // security configuration) and endpoint resources. - serviceName := xdsServiceName + "-file-watcher-certs-" + test.name + serviceName := "my-service-file-watcher-certs-" + test.name resources := e2e.DefaultClientResources(e2e.ResourceParams{ DialTarget: serviceName, NodeID: xdsClientNodeID, @@ -236,7 +234,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { // Create a ClientConn with the xds scheme and make an RPC. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds)) + cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder)) if err != nil { t.Fatalf("failed to dial local test server: %v", err) } @@ -270,7 +268,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) { if err != nil { t.Fatalf("failed to retrieve host and port of server: %v", err) } - serviceName := xdsServiceName + "-security-config-change" + const serviceName = "my-service-security-config-change" resources := e2e.DefaultClientResources(e2e.ResourceParams{ DialTarget: serviceName, NodeID: xdsClientNodeID, @@ -301,7 +299,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) { // Create a ClientConn with the xds scheme and make a successful RPC. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds)) + xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(xdsResolverBuilder)) if err != nil { t.Fatalf("failed to dial local test server: %v", err) } diff --git a/xds/internal/test/xds_server_serving_mode_test.go b/xds/internal/test/xds_server_serving_mode_test.go index 0b70f7b06aed..8fb346298abf 100644 --- a/xds/internal/test/xds_server_serving_mode_test.go +++ b/xds/internal/test/xds_server_serving_mode_test.go @@ -105,7 +105,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { }) // Initialize an xDS-enabled gRPC server and register the stubServer on it. - server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt) + server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) defer server.Stop() testpb.RegisterTestServiceServer(server, &testService{}) diff --git a/xds/server.go b/xds/server.go index 3a2b629ae986..b83a073ac1c5 100644 --- a/xds/server.go +++ b/xds/server.go @@ -131,8 +131,8 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { func handleServerOptions(opts []grpc.ServerOption) *serverOptions { so := &serverOptions{} for _, opt := range opts { - if o, ok := opt.(serverOption); ok { - o.applyServerOption(so) + if o, ok := opt.(*serverOption); ok { + o.apply(so) } } return so @@ -154,6 +154,12 @@ func (s *GRPCServer) initXDSClient() error { return nil } + newXDSClient := newXDSClient + if s.opts.bootstrapContents != nil { + newXDSClient = func() (xdsClientInterface, error) { + return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents) + } + } client, err := newXDSClient() if err != nil { return fmt.Errorf("xds: failed to create xds-client: %v", err) @@ -181,7 +187,6 @@ func (s *GRPCServer) Serve(lis net.Listener) error { if err := s.initXDSClient(); err != nil { return err } - cfg := s.xdsC.BootstrapConfig() if cfg == nil { return errors.New("bootstrap configuration is empty") diff --git a/xds/server_options.go b/xds/server_options.go index 44b7b374fd00..0918c097a3e5 100644 --- a/xds/server_options.go +++ b/xds/server_options.go @@ -25,31 +25,20 @@ import ( iserver "google.golang.org/grpc/xds/internal/server" ) -// ServingModeCallback returns a grpc.ServerOption which allows users to -// register a callback to get notified about serving mode changes. -func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption { - return &smcOption{cb: cb} -} - -type serverOption interface { - applyServerOption(*serverOptions) +type serverOptions struct { + modeCallback ServingModeCallbackFunc + bootstrapContents []byte } -// smcOption is a server option containing a callback to be invoked when the -// serving mode changes. -type smcOption struct { - // Embedding the empty server option makes it safe to pass it to - // grpc.NewServer(). +type serverOption struct { grpc.EmptyServerOption - cb ServingModeCallbackFunc -} - -func (s *smcOption) applyServerOption(o *serverOptions) { - o.modeCallback = s.cb + apply func(*serverOptions) } -type serverOptions struct { - modeCallback ServingModeCallbackFunc +// ServingModeCallback returns a grpc.ServerOption which allows users to +// register a callback to get notified about serving mode changes. +func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption { + return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }} } // ServingMode indicates the current mode of operation of the server. @@ -82,3 +71,15 @@ type ServingModeChangeArgs struct { // not-serving mode. Err error } + +// BootstrapContentsForTesting returns a grpc.ServerOption which allows users +// to inject a bootstrap configuration used by only this server, instead of the +// global configuration from the environment variables. +// +// Testing Only +// +// This function should ONLY be used for testing and may not work with some +// other features, including the CSDS service. +func BootstrapContentsForTesting(contents []byte) grpc.ServerOption { + return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }} +} diff --git a/xds/xds.go b/xds/xds.go index 23c88903f40b..bbd3fe543212 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -38,6 +38,7 @@ import ( v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc" internaladmin "google.golang.org/grpc/internal/admin" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/csds" _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. @@ -45,7 +46,7 @@ import ( _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. _ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client. _ "google.golang.org/grpc/xds/internal/httpfilter/fault" // Register the fault injection filter. - _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. + xdsresolver "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. ) func init() { @@ -76,3 +77,16 @@ func init() { return csdss.Close, nil }) } + +// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using +// the provided xds bootstrap config instead of the global configuration from +// the supported environment variables. The resolver.Builder is meant to be +// used in conjunction with the grpc.WithResolvers DialOption. +// +// Testing Only +// +// This function should ONLY be used for testing and may not work with some +// other features, including the CSDS service. +func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) { + return xdsresolver.NewBuilder(bootstrapConfig) +}