Skip to content

Commit

Permalink
fix flake, avoid panic
Browse files Browse the repository at this point in the history
  • Loading branch information
atollena committed Jun 10, 2024
1 parent 7aae039 commit 0216104
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,42 @@ func checkRPCSendOK(t *testing.T, ctx context.Context, client testpb.TestService
return backendCount
}

// makeNonExistentBackends returns a slice of e2e.BackendOptions with num
// listeners, each of which is closed immediately. Useful to simulate servers
// that are unreachable.
func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions {
closedListeners := make([]net.Listener, 0, num)
for i := 0; i < num; i++ {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
closedListeners = append(closedListeners, lis)
}

// Stop the servers that we want to be unreachable and collect their
// addresses.
backendOptions := make([]e2e.BackendOptions, 0, num)
for _, lis := range closedListeners {
backendOptions = append(backendOptions, e2e.BackendOptions{
Port: testutils.ParsePort(t, lis.Addr().String()),
Weight: 1,
})
lis.Close()
}
return backendOptions
}

// Tests that when an aggregate cluster is configured with ring hash policy, and
// the first cluster is in transient failure, all RPCs are sent to the second
// cluster using the ring hash policy.
func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T) {
xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer stop()

nonExistantServers, stopNonExistant := startTestServiceBackends(t, 2)
servers, stop := startTestServiceBackends(t, 2)
defer stop()

// Stop the servers that we want to be unreachable.
stopNonExistant()

primaryClusterName := "new_cluster_1"
primaryServiceName := "new_eds_service_1"
secondaryClusterName := "new_cluster_2"
Expand All @@ -233,7 +255,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, nonExistantServers),
Backends: makeNonExistentBackends(t, 2),
}},
})
ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
Expand Down Expand Up @@ -336,15 +358,12 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup(
backends, stop := startTestServiceBackends(t, 1)
defer stop()

nonExistingBackend, stop1 := startTestServiceBackends(t, 2)
stop1()

endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: edsClusterName,
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, nonExistingBackend),
Backends: makeNonExistentBackends(t, 1),
Priority: 0,
}},
})
Expand Down Expand Up @@ -420,15 +439,12 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN
backends, stop := startTestServiceBackends(t, 1)
defer stop()

nonExistingBackend, stop1 := startTestServiceBackends(t, 2)
stop1()

endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: edsClusterName,
Localities: []e2e.LocalityOptions{{
Name: "locality0",
Weight: 1,
Backends: backendOptions(t, nonExistingBackend),
Backends: makeNonExistentBackends(t, 1),
Priority: 0,
}},
})
Expand Down Expand Up @@ -767,9 +783,9 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) {
//
// See https://github.com/grpc/grpc/blob/4f6e13bdda9e8c26d6027af97db4b368ca2b3069/test/cpp/end2end/xds/xds_end2end_test_lib.h#L941
// for an explanation of the formula.
func computeIdealNumberOfRPCs(p, errorTolerance float64) int {
func computeIdealNumberOfRPCs(t *testing.T, p, errorTolerance float64) int {
if p < 0 || p > 1 {
panic("p must be in (0, 1)")
t.Fatal("p must be in (0, 1)")
}
numRPCs := math.Ceil(p * (1 - p) * 5. * 5. / errorTolerance / errorTolerance)
return int(numRPCs + 1000.) // add 1k as a buffer to avoid flakyness.
Expand Down Expand Up @@ -804,7 +820,7 @@ func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.C
func (s) TestRingHash_NoHashPolicy(t *testing.T) {
backends, stop := startTestServiceBackends(t, 2)
defer stop()
numRPCs := computeIdealNumberOfRPCs(.5, errorTolerance)
numRPCs := computeIdealNumberOfRPCs(t, .5, errorTolerance)

xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer stop()
Expand Down Expand Up @@ -910,7 +926,7 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) {
client := testgrpc.NewTestServiceClient(conn)

// Send a large number of RPCs and check that they are distributed randomly.
numRPCs := computeIdealNumberOfRPCs(.25, errorTolerance)
numRPCs := computeIdealNumberOfRPCs(t, .25, errorTolerance)
gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs)

got := float64(gotPerBackend[backends[0].Address]) / float64(numRPCs)
Expand Down

0 comments on commit 0216104

Please sign in to comment.