Skip to content

Commit

Permalink
xds: add test-only injection of xds config to client and server (#4476)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Jun 2, 2021
1 parent e5cad3d commit 3508452
Show file tree
Hide file tree
Showing 16 changed files with 307 additions and 90 deletions.
33 changes: 22 additions & 11 deletions internal/xds/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,32 @@ type BootstrapOptions struct {
// completed successfully. It is the responsibility of the caller to invoke the
// cleanup function at the end of the test.
func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
bootstrapContents, err := BootstrapContents(opts)
if err != nil {
return nil, err
}
f, err := ioutil.TempFile("", "test_xds_bootstrap_*")
if err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}

if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)

origBootstrapFileName := env.BootstrapFileName
env.BootstrapFileName = f.Name()
return func() {
os.Remove(f.Name())
env.BootstrapFileName = origBootstrapFileName
}, nil
}

// BootstrapContents returns the contents to go into a bootstrap file,
// environment, or configuration passed to
// xds.NewXDSResolverWithConfigForTesting.
func BootstrapContents(opts BootstrapOptions) ([]byte, error) {
cfg := &bootstrapConfig{
XdsServers: []server{
{
Expand Down Expand Up @@ -100,17 +121,7 @@ func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
if err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)

origBootstrapFileName := env.BootstrapFileName
env.BootstrapFileName = f.Name()
return func() {
os.Remove(f.Name())
env.BootstrapFileName = origBootstrapFileName
}, nil
return bootstrapContents, nil
}

type bootstrapConfig struct {
Expand Down
35 changes: 28 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
newXDSClient func() (xdsClientInterface, error)
buildProvider = buildProviderFunc
)

Expand All @@ -86,12 +86,15 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
b.logger = prefixLogger((b))
b.logger.Infof("Created")

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsClient = client
}
b.xdsClient = client

var creds credentials.TransportCredentials
switch {
Expand Down Expand Up @@ -359,7 +362,15 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
lbCfg.LrsLoadReportingServerName = new(string)

}
resolverState := resolver.State{}
// Include the xds client for the child LB policies to use. For unit
// tests, b.xdsClient may not be a full *xdsclient.Client, but it will
// always be in production.
if c, ok := b.xdsClient.(*xdsclient.Client); ok {
resolverState = xdsclient.SetClient(resolverState, c)
}
ccState := balancer.ClientConnState{
ResolverState: resolverState,
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
Expand Down Expand Up @@ -397,7 +408,9 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
b.xdsClient.Close()
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down Expand Up @@ -468,6 +481,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
return errBalancerClosed
}

if b.xdsClient == nil {
c := xdsclient.FromResolverState(state.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.xdsClient = c
}

b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig))
// The errors checked here should ideally never happen because the
// ServiceConfig in this case is prepared by the xdsResolver and is not
Expand Down
31 changes: 24 additions & 7 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
balancer.Register(clusterImplBB{})
}

var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
var newXDSClient func() (xdsClientInterface, error)

type clusterImplBB struct{}

Expand All @@ -61,18 +61,22 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
}
b.logger = prefixLogger(b)

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsC = client
}
b.xdsC = client
go b.run()

b.logger.Infof("Created")
Expand Down Expand Up @@ -107,6 +111,7 @@ type clusterImplBalancer struct {
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -204,6 +209,14 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}

if cib.xdsC == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
cib.xdsC = c
}

// Update load reporting config. This needs to be done before updating the
// child policy because we need the loadStore from the updated client to be
// passed to the ccWrapper, so that the next picker from the child policy
Expand Down Expand Up @@ -315,7 +328,10 @@ func (cib *clusterImplBalancer) Close() {
cib.childLB.Close()
cib.childLB = nil
}
cib.xdsC.Close()
if newXDSClient != nil {
cib.xdsC.Close()
}
<-cib.done.Done()
cib.logger.Infof("Shutdown")
}

Expand Down Expand Up @@ -363,6 +379,7 @@ type dropConfigs struct {
}

func (cib *clusterImplBalancer) run() {
defer cib.done.Fire()
for {
select {
case update := <-cib.pickerUpdateCh.Get():
Expand Down
27 changes: 20 additions & 7 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
newXDSClient func() (xdsClientInterface, error)
)

func init() {
Expand All @@ -76,13 +76,16 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
}
x.logger = prefixLogger(x)

client, err := newXDSClient()
if err != nil {
x.logger.Errorf("xds: failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
x.logger.Errorf("xds: failed to create xds-client: %v", err)
return nil
}
x.xdsClient = client
}

x.xdsClient = client
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger)
x.logger.Infof("Created")
go x.run()
Expand Down Expand Up @@ -172,7 +175,9 @@ func (x *edsBalancer) run() {
x.edsImpl.updateState(u.priority, u.s)
case <-x.closed.Done():
x.cancelWatch()
x.xdsClient.Close()
if newXDSClient != nil {
x.xdsClient.Close()
}
x.edsImpl.close()
x.logger.Infof("Shutdown")
x.done.Fire()
Expand Down Expand Up @@ -380,6 +385,14 @@ func (x *edsBalancer) ResolverError(err error) {
}

func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if x.xdsClient == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
x.xdsClient = c
}

select {
case x.grpcUpdate <- &s:
case <-x.closed.Done():
Expand Down
27 changes: 20 additions & 7 deletions xds/internal/balancer/lrs/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
balancer.Register(&lrsBB{})
}

var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
var newXDSClient func() (xdsClientInterface, error)

// Name is the name of the LRS balancer.
const Name = "lrs_experimental"
Expand All @@ -51,12 +51,15 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc
b.logger = prefixLogger(b)
b.logger.Infof("Created")

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.client = newXDSClientWrapper(client)
}
b.client = newXDSClientWrapper(client)

return b
}
Expand Down Expand Up @@ -87,6 +90,14 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

if b.client == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.client = newXDSClientWrapper(c)
}

// Update load reporting config or xds client. This needs to be done before
// updating the child policy because we need the loadStore from the updated
// client to be passed to the ccWrapper.
Expand Down Expand Up @@ -245,5 +256,7 @@ func (w *xdsClientWrapper) close() {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
w.c.Close()
if newXDSClient != nil {
w.c.Close()
}
}
36 changes: 36 additions & 0 deletions xds/internal/client/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client

import "google.golang.org/grpc/resolver"

type clientKeyType string

const clientKey = clientKeyType("grpc.xds.internal.client.Client")

// FromResolverState returns the Client from state, or nil if not present.
func FromResolverState(state resolver.State) *Client {
cs, _ := state.Attributes.Value(clientKey).(*Client)
return cs
}

// SetClient sets c in state and returns the new state.
func SetClient(state resolver.State, c *Client) resolver.State {
state.Attributes = state.Attributes.WithValues(clientKey, c)
return state
}
10 changes: 8 additions & 2 deletions xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,19 @@ func bootstrapConfigFromEnvVariable() ([]byte, error) {
// fields left unspecified, in which case the caller should use some sane
// defaults.
func NewConfig() (*Config, error) {
config := &Config{}

data, err := bootstrapConfigFromEnvVariable()
if err != nil {
return nil, fmt.Errorf("xds: Failed to read bootstrap config: %v", err)
}
logger.Debugf("Bootstrap content: %s", data)
return NewConfigFromContents(data)
}

// NewConfigFromContents returns a new Config using the specified bootstrap
// file contents instead of reading the environment variable. This is only
// suitable for testing purposes.
func NewConfigFromContents(data []byte) (*Config, error) {
config := &Config{}

var jsonData map[string]json.RawMessage
if err := json.Unmarshal(data, &jsonData); err != nil {
Expand Down
Loading

0 comments on commit 3508452

Please sign in to comment.