diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index ff2a5d43398a..9d46483e3d23 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -526,10 +526,23 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster { // LocalityOptions contains options to configure a Locality. type LocalityOptions struct { - // Ports is a set of ports on "localhost" belonging to this locality. - Ports []uint32 + // Name is the unique locality name. + Name string // Weight is the weight of the locality, used for load balancing. Weight uint32 + // Backends is a set of backends belonging to this locality. + Backends []BackendOptions +} + +// BackendOptions contains options to configure individual backends in a +// locality. +type BackendOptions struct { + // Port number on which the backend is accepting connections. All backends + // are expected to run on localhost, hence host name is not stored here. + Port uint32 + // Health status of the backend. Default is UNKNOWN which is treated the + // same as HEALTHY. + HealthStatus v3corepb.HealthStatus } // EndpointOptions contains options to configure an Endpoint (or @@ -550,13 +563,17 @@ type EndpointOptions struct { // DefaultEndpoint returns a basic xds Endpoint resource. func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment { + var bOpts []BackendOptions + for _, p := range ports { + bOpts = append(bOpts, BackendOptions{Port: p}) + } return EndpointResourceWithOptions(EndpointOptions{ ClusterName: clusterName, Host: host, Localities: []LocalityOptions{ { - Ports: ports, - Weight: 1, + Backends: bOpts, + Weight: 1, }, }, }) @@ -568,16 +585,18 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad var endpoints []*v3endpointpb.LocalityLbEndpoints for i, locality := range opts.Localities { var lbEndpoints []*v3endpointpb.LbEndpoint - for _, port := range locality.Ports { + for _, b := range locality.Backends { lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{ HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ SocketAddress: &v3corepb.SocketAddress{ Protocol: v3corepb.SocketAddress_TCP, Address: opts.Host, - PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port}, + }, }}, }}, + HealthStatus: b.HealthStatus, LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1}, }) } diff --git a/test/xds/xds_client_custom_lb_test.go b/test/xds/xds_client_custom_lb_test.go index 749eb7f9aa64..4624c252b0bb 100644 --- a/test/xds/xds_client_custom_lb_test.go +++ b/test/xds/xds_client_custom_lb_test.go @@ -213,12 +213,12 @@ func (s) TestWrrLocality(t *testing.T) { Host: "localhost", Localities: []e2e.LocalityOptions{ { - Ports: []uint32{port1, port2}, - Weight: 1, + Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}}, + Weight: 1, }, { - Ports: []uint32{port3, port4, port5}, - Weight: 2, + Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}}, + Weight: 2, }, }, })}, diff --git a/xds/internal/balancer/clusterimpl/tests/balancer_test.go b/xds/internal/balancer/clusterimpl/tests/balancer_test.go index d335ecd7e844..1cb8492949a0 100644 --- a/xds/internal/balancer/clusterimpl/tests/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/tests/balancer_test.go @@ -116,8 +116,8 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) { Host: "localhost", Localities: []e2e.LocalityOptions{ { - Ports: []uint32{testutils.ParsePort(t, server.Address)}, - Weight: 1, + Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + Weight: 1, }, }, DropPercents: map[string]int{"test-drop-everything": 100}, diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index b45084edae0f..f49528499356 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -101,57 +101,16 @@ func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.Stub } } -// endpointResource returns an EDS resource for the given cluster name and -// localities. Backends within a locality are all assumed to be on the same -// machine (localhost). -func endpointResource(clusterName string, localities []localityInfo) *v3endpointpb.ClusterLoadAssignment { - var localityEndpoints []*v3endpointpb.LocalityLbEndpoints - for _, locality := range localities { - var endpoints []*v3endpointpb.LbEndpoint - for i, port := range locality.ports { - endpoint := &v3endpointpb.LbEndpoint{ - HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{ - Endpoint: &v3endpointpb.Endpoint{ - Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ - SocketAddress: &v3corepb.SocketAddress{ - Protocol: v3corepb.SocketAddress_TCP, - Address: "localhost", - PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, - }, - }, - }, - }, - } - if i < len(locality.healthStatus) { - endpoint.HealthStatus = locality.healthStatus[i] - } - endpoints = append(endpoints, endpoint) - } - localityEndpoints = append(localityEndpoints, &v3endpointpb.LocalityLbEndpoints{ - Locality: &v3corepb.Locality{SubZone: locality.name}, - LbEndpoints: endpoints, - LoadBalancingWeight: wrapperspb.UInt32(locality.weight), - }) - } - return &v3endpointpb.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: localityEndpoints, - } -} - -type localityInfo struct { - name string - weight uint32 - ports []uint32 - healthStatus []v3corepb.HealthStatus -} - // clientEndpointsResource returns an EDS resource for the specified nodeID, // service name and localities. -func clientEndpointsResource(nodeID, edsServiceName string, localities []localityInfo) e2e.UpdateOptions { +func clientEndpointsResource(nodeID, edsServiceName string, localities []e2e.LocalityOptions) e2e.UpdateOptions { return e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpointResource(edsServiceName, localities)}, + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: edsServiceName, + Host: "localhost", + Localities: localities, + })}, SkipValidation: true, } } @@ -175,7 +134,11 @@ func (s) TestEDS_OneLocality(t *testing.T) { // Create xDS resources for consumption by the test. We start off with a // single backend in a single EDS locality. - resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}}) + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[0]}}, + }}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := managementServer.Update(ctx, resources); err != nil { @@ -223,7 +186,11 @@ func (s) TestEDS_OneLocality(t *testing.T) { // Add a backend to the same locality, and ensure RPCs are sent in a // roundrobin fashion across the two backends. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:2]}}) + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}}, + }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -233,7 +200,11 @@ func (s) TestEDS_OneLocality(t *testing.T) { // Remove the first backend, and ensure all RPCs are sent to the second // backend. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[1:2]}}) + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[1]}}, + }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -242,7 +213,11 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // Replace the backend, and ensure all RPCs are sent to the new backend. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[2:3]}}) + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[2]}}, + }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -281,9 +256,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { // Create xDS resources for consumption by the test. We start off with two // localities, and single backend in each of them. - resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName1, weight: 1, ports: ports[:1]}, - {name: localityName2, weight: 1, ports: ports[1:2]}, + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{ + { + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[0]}}, + }, + { + Name: localityName2, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[1]}}, + }, }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -332,10 +315,22 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { // Add another locality with a single backend, and ensure RPCs are being // weighted roundrobined across the three backends. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName1, weight: 1, ports: ports[:1]}, - {name: localityName2, weight: 1, ports: ports[1:2]}, - {name: localityName3, weight: 1, ports: ports[2:3]}, + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{ + { + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[0]}}, + }, + { + Name: localityName2, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[1]}}, + }, + { + Name: localityName3, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[2]}}, + }, }) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -346,9 +341,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { // Remove the first locality, and ensure RPCs are being weighted // roundrobined across the remaining two backends. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName2, weight: 1, ports: ports[1:2]}, - {name: localityName3, weight: 1, ports: ports[2:3]}, + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{ + { + Name: localityName2, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[1]}}, + }, + { + Name: localityName3, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[2]}}, + }, }) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -360,9 +363,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { // Add a backend to one locality, and ensure weighted roundrobin. Since RPCs // are roundrobined across localities, locality2's backend will receive // twice the traffic. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName2, weight: 1, ports: ports[1:2]}, - {name: localityName3, weight: 1, ports: ports[2:4]}, + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{ + { + Name: localityName2, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[1]}}, + }, + { + Name: localityName3, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}}, + }, }) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -389,23 +400,31 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { // Create xDS resources for consumption by the test. Two localities with // six backends each, with two of the six backends being healthy. Both // UNKNOWN and HEALTHY are considered by gRPC for load balancing. - resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ - {name: localityName1, weight: 1, ports: ports[:6], healthStatus: []v3corepb.HealthStatus{ - v3corepb.HealthStatus_UNKNOWN, - v3corepb.HealthStatus_HEALTHY, - v3corepb.HealthStatus_UNHEALTHY, - v3corepb.HealthStatus_DRAINING, - v3corepb.HealthStatus_TIMEOUT, - v3corepb.HealthStatus_DEGRADED, - }}, - {name: localityName2, weight: 1, ports: ports[6:12], healthStatus: []v3corepb.HealthStatus{ - v3corepb.HealthStatus_UNKNOWN, - v3corepb.HealthStatus_HEALTHY, - v3corepb.HealthStatus_UNHEALTHY, - v3corepb.HealthStatus_DRAINING, - v3corepb.HealthStatus_TIMEOUT, - v3corepb.HealthStatus_DEGRADED, - }}, + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{ + { + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{ + {Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN}, + {Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY}, + {Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, + {Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING}, + {Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT}, + {Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED}, + }, + }, + { + Name: localityName2, + Weight: 1, + Backends: []e2e.BackendOptions{ + {Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN}, + {Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY}, + {Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, + {Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING}, + {Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT}, + {Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED}, + }, + }, }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -520,7 +539,11 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } // Add a locality with one backend and ensure RPCs are successful. - resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}}) + resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: ports[0]}}, + }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) }