Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: unify xDS client creation APIs meant for testing #7268

Merged
merged 4 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,9 @@ var (

ChannelzTurnOffForTesting func()

// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
// error for a given resource type and name. This is usually triggered when
// the associated watch timer fires. For testing purposes, having this
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error

// TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client
// singleton to invoke resource not found for a resource type name and
// resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
// TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to
// invoke resource-not-found error for the given resource type and name.
TriggerXDSResourceNotFoundForTesting any // func(xdsclient.XDSClient, xdsresource.Type, string) error

// FromOutgoingContextRaw returns the un-merged, intermediary contents of
// metadata.rawMD.
Expand Down
16 changes: 11 additions & 5 deletions internal/testutils/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,17 @@ func Contents(opts Options) ([]byte, error) {
// resources with empty authority.
auths := map[string]authority{"": {}}
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
for n, auURI := range opts.Authorities {
auths[n] = authority{XdsServers: []server{{
ServerURI: auURI,
ChannelCreds: []creds{{Type: "insecure"}},
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
}}}
// If the authority server URI is empty, set it to an empty authority
// config, resulting in it using the top-level xds server config.
a := authority{}
if auURI != "" {
a = authority{XdsServers: []server{{
ServerURI: auURI,
ChannelCreds: []creds{{Type: "insecure"}},
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
}}}
}
auths[n] = a
}
cfg.Authorities = auths

Expand Down
12 changes: 12 additions & 0 deletions internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (a *Authority) UnmarshalJSON(data []byte) error {
// Config provides the xDS client with several key bits of information that it
// requires in its interaction with the management server. The Config is
// initialized from the bootstrap file.
//
// Users must use one of the NewXxx() functions to create a Config instance, and
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
// not initialize it manually.
type Config struct {
// XDSServer is the management server to connect to.
//
Expand Down Expand Up @@ -415,6 +418,15 @@ func NewConfigFromContents(data []byte) (*Config, error) {
}

func newConfigFromContents(data []byte) (*Config, error) {
// Normalize the input configuration, as this is used as the key in the map
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
// of xDS clients created for testing.
buf := bytes.Buffer{}
err := json.Indent(&buf, data, "", "")
if err != nil {
return nil, fmt.Errorf("xds: error normalizing JSON bootstrap configuration: %v", err)
}
data = bytes.TrimSpace(buf.Bytes())

config := &Config{}

var jsonData map[string]json.RawMessage
Expand Down
199 changes: 0 additions & 199 deletions test/xds/xds_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
Expand Down Expand Up @@ -224,204 +223,6 @@ func (s) TestRDSNack(t *testing.T) {
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
// Close any new connections. After it has received the resource not found
// error, the server should move to serving, successfully Accept Connections,
// and fail at the L7 level with resource not found specified.
func (s) TestResourceNotFoundRDS(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)

// Invoke resource not found - this should result in L7 RPC error with
// unavailable receive on serving as a result, should trigger it to go
// serving. Poll as watch might not be started yet to trigger resource not
// found.
loop:
for {
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-serving.Done():
break loop
case <-ctx.Done():
t.Fatalf("timed out waiting for serving mode to go serving")
case <-time.After(time.Millisecond):
}
}
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestServingModeChanges tests the Server's logic as it transitions from Not
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
// be accepted and closed. After it goes ready, RPC's should proceed as normal
// according to matched route configuration. After it transitions back into not
// ready (through an explicit LDS Resource Not Found), previously running RPC's
// should be gracefully closed and still work, and new RPC's should fail.
func (s) TestServingModeChanges(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch. Due to not having received the full
// configuration, this should cause the server to be in mode Serving.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}

// A unary RPC should work once it transitions into serving. (need this same
// assertion from LDS resource not found triggering it).
waitForSuccessfulRPC(ctx, t, cc)

// Start a stream before switching the server to not serving. Due to the
// stream being created before the graceful stop of the underlying
// connection, it should be able to continue even after the server switches
// to not serving.
c := testgrpc.NewTestServiceClient(cc)
stream, err := c.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}

// Invoke the lds resource not found - this should cause the server to
// switch to not serving. This should gracefully drain connections, and fail
// RPC's after. (how to assert accepted + closed) does this make it's way to
// application layer? (should work outside of resource not found...

// Invoke LDS Resource not found here (tests graceful close)
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}

// New RPCs on that connection should eventually start failing. Due to
// Graceful Stop any started streams continue to work.
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}

// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}

// TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
// specifying RDS A, B, and C (with A being matched to). The Server should be in
// not serving until it receives all 3 RDS Configurations, and then transition
Expand Down
8 changes: 1 addition & 7 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -232,14 +231,9 @@ func TestBuildXDS(t *testing.T) {
if tt.tdURI != "" {
wantConfig.XDSServer.ServerURI = tt.tdURI
}
cmpOpts := cmp.Options{
cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds"),
cmp.AllowUnexported(bootstrap.ServerConfig{}),
protocmp.Transform(),
}
select {
case gotConfig := <-configCh:
if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" {
if diff := cmp.Diff(wantConfig, gotConfig, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff)
}
case <-time.After(time.Second):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr
func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) {
t.Helper()

xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr
})
t.Cleanup(cleanup)

xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) {
// Setup a management server and an xDS client to talk to it.
_, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
t.Cleanup(cleanup)
xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
Expand Down Expand Up @@ -1107,12 +1104,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address)
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}
xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout})
if err != nil {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun
t.Helper()

// Create an xDS client for use by the cluster_resolver LB policy.
xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
Loading
Loading