Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: clusterresolver e2e test cleanup #6391

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,22 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster {

// LocalityOptions contains options to configure a Locality.
type LocalityOptions struct {
// Ports is a set of ports on "localhost" belonging to this locality.
Ports []uint32
// Name is the unique locality name.
Name string
// Weight is the weight of the locality, used for load balancing.
Weight uint32
// Backends is a set of backends belonging to this locality.
Backends []BackendOptions
}

// BackendOptions contains options to configure individual backends in a
// locality.
type BackendOptions struct {
// Port number on which the backend is accepting connections.
Port uint32
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Still looks like on Localhost. Mention somewhere in these comments? I guess maybe we'll scale it up in future, but then you can't test locally haha.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added.

Yes, we probably will never make it generic enough to run on non-localhost machines :)

}

// EndpointOptions contains options to configure an Endpoint (or
Expand All @@ -550,13 +562,17 @@ type EndpointOptions struct {

// DefaultEndpoint returns a basic xds Endpoint resource.
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Host: host,
Localities: []LocalityOptions{
{
Ports: ports,
Weight: 1,
Backends: bOpts,
Weight: 1,
},
},
})
Expand All @@ -568,16 +584,18 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range locality.Ports {
for _, b := range locality.Backends {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port},
},
}},
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
Expand Down
8 changes: 4 additions & 4 deletions test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ func (s) TestWrrLocality(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{port1, port2},
Weight: 1,
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Weight: 1,
},
{
Ports: []uint32{port3, port4, port5},
Weight: 2,
Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}},
Weight: 2,
},
},
})},
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{testutils.ParsePort(t, server.Address)},
Weight: 1,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
Weight: 1,
},
},
DropPercents: map[string]int{"test-drop-everything": 100},
Expand Down
189 changes: 106 additions & 83 deletions xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,57 +101,16 @@ func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.Stub
}
}

// endpointResource returns an EDS resource for the given cluster name and
// localities. Backends within a locality are all assumed to be on the same
// machine (localhost).
func endpointResource(clusterName string, localities []localityInfo) *v3endpointpb.ClusterLoadAssignment {
var localityEndpoints []*v3endpointpb.LocalityLbEndpoints
for _, locality := range localities {
var endpoints []*v3endpointpb.LbEndpoint
for i, port := range locality.ports {
endpoint := &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{
Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: "localhost",
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
},
},
},
},
}
if i < len(locality.healthStatus) {
endpoint.HealthStatus = locality.healthStatus[i]
}
endpoints = append(endpoints, endpoint)
}
localityEndpoints = append(localityEndpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{SubZone: locality.name},
LbEndpoints: endpoints,
LoadBalancingWeight: wrapperspb.UInt32(locality.weight),
})
}
return &v3endpointpb.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: localityEndpoints,
}
}

type localityInfo struct {
name string
weight uint32
ports []uint32
healthStatus []v3corepb.HealthStatus
}

// clientEndpointsResource returns an EDS resource for the specified nodeID,
// service name and localities.
func clientEndpointsResource(nodeID, edsServiceName string, localities []localityInfo) e2e.UpdateOptions {
func clientEndpointsResource(nodeID, edsServiceName string, localities []e2e.LocalityOptions) e2e.UpdateOptions {
return e2e.UpdateOptions{
NodeID: nodeID,
Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpointResource(edsServiceName, localities)},
NodeID: nodeID,
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: edsServiceName,
Host: "localhost",
Localities: localities,
})},
SkipValidation: true,
}
}
Expand All @@ -175,7 +134,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Create xDS resources for consumption by the test. We start off with a
// single backend in a single EDS locality.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
Expand Down Expand Up @@ -223,7 +186,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Add a backend to the same locality, and ensure RPCs are sent in a
// roundrobin fashion across the two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:2]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand All @@ -233,7 +200,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {

// Remove the first backend, and ensure all RPCs are sent to the second
// backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[1:2]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand All @@ -242,7 +213,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}

// Replace the backend, and ensure all RPCs are sent to the new backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[2:3]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -281,9 +256,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Create xDS resources for consumption by the test. We start off with two
// localities, and single backend in each of them.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -332,10 +315,22 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Add another locality with a single backend, and ensure RPCs are being
// weighted roundrobined across the three backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -346,9 +341,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {

// Remove the first locality, and ensure RPCs are being weighted
// roundrobined across the remaining two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -360,9 +363,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {
// Add a backend to one locality, and ensure weighted roundrobin. Since RPCs
// are roundrobined across localities, locality2's backend will receive
// twice the traffic.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:4]},
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
},
{
Name: localityName3,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}},
},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
Expand All @@ -389,23 +400,31 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
// Create xDS resources for consumption by the test. Two localities with
// six backends each, with two of the six backends being healthy. Both
// UNKNOWN and HEALTHY are considered by gRPC for load balancing.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:6], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
{name: localityName2, weight: 1, ports: ports[6:12], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{
{Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
{Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY},
{Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
{Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING},
{Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
{Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED},
},
},
{
Name: localityName2,
Weight: 1,
Backends: []e2e.BackendOptions{
{Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
{Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY},
{Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
{Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING},
{Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
{Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED},
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -520,7 +539,11 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}

// Add a locality with one backend and ensure RPCs are successful.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand Down