Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add test-only injection of xds config to client and server #4476

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions internal/xds/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,32 @@ type BootstrapOptions struct {
// completed successfully. It is the responsibility of the caller to invoke the
// cleanup function at the end of the test.
func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
bootstrapContents, err := BootstrapContents(opts)
if err != nil {
return nil, err
}
f, err := ioutil.TempFile("", "test_xds_bootstrap_*")
if err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}

if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)

origBootstrapFileName := env.BootstrapFileName
env.BootstrapFileName = f.Name()
return func() {
os.Remove(f.Name())
env.BootstrapFileName = origBootstrapFileName
}, nil
}

// BootstrapContents returns the contents to go into a bootstrap file,
// environment, or configuration passed to
// xds.NewXDSResolverWithConfigForTesting.
func BootstrapContents(opts BootstrapOptions) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the contents of this file were moved from xds/internal/testutils/e2e/bootstrap.go to here in an earlier PR implementing admin services. What is the reason for this code to node live in a package which is explicitly for testing? The change need not be done in this PR, but I'm trying to understand why this package was moved from a testutils kind of package to simply an internal package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it was moved because the admin service/CSDS is not a test-only thing. That seems reasonable to me. Or are you saying more was moved than needed to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems like all uses of funcs in this file are from tests. And the newly added BootstrapContents as well.

cfg := &bootstrapConfig{
XdsServers: []server{
{
Expand Down Expand Up @@ -100,17 +121,7 @@ func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
if err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
}
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)

origBootstrapFileName := env.BootstrapFileName
env.BootstrapFileName = f.Name()
return func() {
os.Remove(f.Name())
env.BootstrapFileName = origBootstrapFileName
}, nil
return bootstrapContents, nil
}

type bootstrapConfig struct {
Expand Down
32 changes: 25 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
newXDSClient func() (xdsClientInterface, error)
buildProvider = buildProviderFunc
)

Expand All @@ -86,12 +86,15 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
b.logger = prefixLogger((b))
b.logger.Infof("Created")

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsClient = client
}
b.xdsClient = client

var creds credentials.TransportCredentials
switch {
Expand Down Expand Up @@ -359,7 +362,12 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
lbCfg.LrsLoadReportingServerName = new(string)

}
resolverState := resolver.State{}
if c, ok := b.xdsClient.(*xdsclient.Client); ok {
resolverState = xdsclient.SetClient(resolverState, c)
}
ccState := balancer.ClientConnState{
ResolverState: resolverState,
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
Expand Down Expand Up @@ -397,7 +405,9 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
b.xdsClient.Close()
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down Expand Up @@ -468,6 +478,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
return errBalancerClosed
}

if b.xdsClient == nil {
c := xdsclient.FromResolverState(state.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.xdsClient = c
}

b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig))
// The errors checked here should ideally never happen because the
// ServiceConfig in this case is prepared by the xdsResolver and is not
Expand Down
31 changes: 24 additions & 7 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
balancer.Register(clusterImplBB{})
}

var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
var newXDSClient func() (xdsClientInterface, error)

type clusterImplBB struct{}

Expand All @@ -61,18 +61,22 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
}
b.logger = prefixLogger(b)

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was sufficient, actually. Tests will set this; it will be nil in production.

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsC = client
}
b.xdsC = client
go b.run()

b.logger.Infof("Created")
Expand Down Expand Up @@ -107,6 +111,7 @@ type clusterImplBalancer struct {
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -204,6 +209,14 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}

if cib.xdsC == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
cib.xdsC = c
}

// Update load reporting config. This needs to be done before updating the
// child policy because we need the loadStore from the updated client to be
// passed to the ccWrapper, so that the next picker from the child policy
Expand Down Expand Up @@ -315,7 +328,10 @@ func (cib *clusterImplBalancer) Close() {
cib.childLB.Close()
cib.childLB = nil
}
cib.xdsC.Close()
if newXDSClient != nil {
cib.xdsC.Close()
}
<-cib.done.Done()
cib.logger.Infof("Shutdown")
}

Expand Down Expand Up @@ -363,6 +379,7 @@ type dropConfigs struct {
}

func (cib *clusterImplBalancer) run() {
defer cib.done.Fire()
for {
select {
case update := <-cib.pickerUpdateCh.Get():
Expand Down
27 changes: 20 additions & 7 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
newXDSClient func() (xdsClientInterface, error)
)

func init() {
Expand All @@ -76,13 +76,16 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
}
x.logger = prefixLogger(x)

client, err := newXDSClient()
if err != nil {
x.logger.Errorf("xds: failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
x.logger.Errorf("xds: failed to create xds-client: %v", err)
return nil
}
x.xdsClient = client
}

x.xdsClient = client
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger)
x.logger.Infof("Created")
go x.run()
Expand Down Expand Up @@ -172,7 +175,9 @@ func (x *edsBalancer) run() {
x.edsImpl.updateState(u.priority, u.s)
case <-x.closed.Done():
x.cancelWatch()
x.xdsClient.Close()
if newXDSClient != nil {
x.xdsClient.Close()
}
x.edsImpl.close()
x.logger.Infof("Shutdown")
x.done.Fire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the defer x.done.Fire() in clusterImpl policy. Can we do the same here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delay this as another cleanup? It will conflict with #4479

Expand Down Expand Up @@ -380,6 +385,14 @@ func (x *edsBalancer) ResolverError(err error) {
}

func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if x.xdsClient == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
x.xdsClient = c
}

select {
case x.grpcUpdate <- &s:
case <-x.closed.Done():
Expand Down
27 changes: 20 additions & 7 deletions xds/internal/balancer/lrs/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
balancer.Register(&lrsBB{})
}

var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
var newXDSClient func() (xdsClientInterface, error)

// Name is the name of the LRS balancer.
const Name = "lrs_experimental"
Expand All @@ -51,12 +51,15 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc
b.logger = prefixLogger(b)
b.logger.Infof("Created")

client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.client = newXDSClientWrapper(client)
}
b.client = newXDSClientWrapper(client)

return b
}
Expand Down Expand Up @@ -87,6 +90,14 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

if b.client == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.client = newXDSClientWrapper(c)
}

// Update load reporting config or xds client. This needs to be done before
// updating the child policy because we need the loadStore from the updated
// client to be passed to the ccWrapper.
Expand Down Expand Up @@ -245,5 +256,7 @@ func (w *xdsClientWrapper) close() {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
w.c.Close()
if newXDSClient != nil {
w.c.Close()
}
}
36 changes: 36 additions & 0 deletions xds/internal/client/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client

import "google.golang.org/grpc/resolver"

type clientKeyType string

const clientKey = clientKeyType("grpc.xds.internal.client.Client")

// FromResolverState returns the Client from state, or nil if not present.
func FromResolverState(state resolver.State) *Client {
cs, _ := state.Attributes.Value(clientKey).(*Client)
return cs
}

// SetClient sets c in state and returns the new state.
func SetClient(state resolver.State, c *Client) resolver.State {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever have the need to pass the xdsClient in attributes which are not part of the resolver.State? The reason I'm asking is if we could operate on attributes.Attributes in these functions instead of resolver.State.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the convention we've been using for everything else in attributes... This makes it more convenient to use (set & get) than operating on attributes. I wouldn't be opposed to a change, but for this PR I'd rather follow the established convention.

state.Attributes = state.Attributes.WithValues(clientKey, c)
return state
}
10 changes: 8 additions & 2 deletions xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,19 @@ func bootstrapConfigFromEnvVariable() ([]byte, error) {
// fields left unspecified, in which case the caller should use some sane
// defaults.
func NewConfig() (*Config, error) {
config := &Config{}

data, err := bootstrapConfigFromEnvVariable()
if err != nil {
return nil, fmt.Errorf("xds: Failed to read bootstrap config: %v", err)
}
logger.Debugf("Bootstrap content: %s", data)
return NewConfigFromContents(data)
}

// NewConfigFromContents returns a new Config using the specified bootstrap
// file contents instead of reading the environment variable. This is only
// suitable for testing purposes.
func NewConfigFromContents(data []byte) (*Config, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: NewConfigFromContentsForTesting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem worthwhile to use this naming given:

  1. it's internal-only, and
  2. if we ever had a desire to call this from our production implementation, we'd probably just do it and remove the FromTesting at that time.

config := &Config{}

var jsonData map[string]json.RawMessage
if err := json.Unmarshal(data, &jsonData); err != nil {
Expand Down
Loading