diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index 9048ea802d7a..b319d4382157 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -208,6 +208,32 @@ func checkRPCSendOK(t *testing.T, ctx context.Context, client testpb.TestService return backendCount } +// makeNonExistentBackends returns a slice of e2e.BackendOptions with num +// listeners, each of which is closed immediately. Useful to simulate servers +// that are unreachable. +func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { + closedListeners := make([]net.Listener, 0, num) + for i := 0; i < num; i++ { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + closedListeners = append(closedListeners, lis) + } + + // Stop the servers that we want to be unreachable and collect their + // addresses. + backendOptions := make([]e2e.BackendOptions, 0, num) + for _, lis := range closedListeners { + backendOptions = append(backendOptions, e2e.BackendOptions{ + Port: testutils.ParsePort(t, lis.Addr().String()), + Weight: 1, + }) + lis.Close() + } + return backendOptions +} + // Tests that when an aggregate cluster is configured with ring hash policy, and // the first cluster is in transient failure, all RPCs are sent to the second // cluster using the ring hash policy. @@ -215,13 +241,9 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() - nonExistantServers, stopNonExistant := startTestServiceBackends(t, 2) servers, stop := startTestServiceBackends(t, 2) defer stop() - // Stop the servers that we want to be unreachable. - stopNonExistant() - primaryClusterName := "new_cluster_1" primaryServiceName := "new_eds_service_1" secondaryClusterName := "new_cluster_2" @@ -233,7 +255,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: backendOptions(t, nonExistantServers), + Backends: makeNonExistentBackends(t, 2), }}, }) ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -336,15 +358,12 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( backends, stop := startTestServiceBackends(t, 1) defer stop() - nonExistingBackend, stop1 := startTestServiceBackends(t, 2) - stop1() - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: backendOptions(t, nonExistingBackend), + Backends: makeNonExistentBackends(t, 1), Priority: 0, }}, }) @@ -420,15 +439,12 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN backends, stop := startTestServiceBackends(t, 1) defer stop() - nonExistingBackend, stop1 := startTestServiceBackends(t, 2) - stop1() - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: backendOptions(t, nonExistingBackend), + Backends: makeNonExistentBackends(t, 1), Priority: 0, }}, }) @@ -767,9 +783,9 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { // // See https://github.com/grpc/grpc/blob/4f6e13bdda9e8c26d6027af97db4b368ca2b3069/test/cpp/end2end/xds/xds_end2end_test_lib.h#L941 // for an explanation of the formula. -func computeIdealNumberOfRPCs(p, errorTolerance float64) int { +func computeIdealNumberOfRPCs(t *testing.T, p, errorTolerance float64) int { if p < 0 || p > 1 { - panic("p must be in (0, 1)") + t.Fatal("p must be in (0, 1)") } numRPCs := math.Ceil(p * (1 - p) * 5. * 5. / errorTolerance / errorTolerance) return int(numRPCs + 1000.) // add 1k as a buffer to avoid flakyness. @@ -804,7 +820,7 @@ func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.C func (s) TestRingHash_NoHashPolicy(t *testing.T) { backends, stop := startTestServiceBackends(t, 2) defer stop() - numRPCs := computeIdealNumberOfRPCs(.5, errorTolerance) + numRPCs := computeIdealNumberOfRPCs(t, .5, errorTolerance) xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() @@ -910,7 +926,7 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) { client := testgrpc.NewTestServiceClient(conn) // Send a large number of RPCs and check that they are distributed randomly. - numRPCs := computeIdealNumberOfRPCs(.25, errorTolerance) + numRPCs := computeIdealNumberOfRPCs(t, .25, errorTolerance) gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) got := float64(gotPerBackend[backends[0].Address]) / float64(numRPCs)