diff --git a/internal/testutils/blocking_context_dialer.go b/internal/testutils/blocking_context_dialer.go index ea7a8519376b..65792ca7c327 100644 --- a/internal/testutils/blocking_context_dialer.go +++ b/internal/testutils/blocking_context_dialer.go @@ -21,36 +21,105 @@ package testutils import ( "context" "net" + "sync" + + "google.golang.org/grpc/grpclog" ) +var logger = grpclog.Component("testutils") + // BlockingDialer is a dialer that waits for Resume() to be called before // dialing. type BlockingDialer struct { - dialer *net.Dialer - blockCh chan struct{} + mu sync.Mutex // protects holds + holds map[string][]*Hold + + dialer *net.Dialer } // NewBlockingDialer returns a dialer that waits for Resume() to be called // before dialing. func NewBlockingDialer() *BlockingDialer { return &BlockingDialer{ - dialer: &net.Dialer{}, - blockCh: make(chan struct{}), + dialer: &net.Dialer{}, + holds: make(map[string][]*Hold), } } // DialContext implements a context dialer for use with grpc.WithContextDialer // dial option for a BlockingDialer. func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) { + d.mu.Lock() + holds := d.holds[addr] + if len(holds) > 0 { + logger.Info("Intercepted connection attempt to addr %s", addr) + hold := holds[0] + d.holds[addr] = holds[1:] + d.mu.Unlock() + + close(hold.waitCh) + select { + case <-hold.blockCh: + if hold.err != nil { + return nil, hold.err + } + return d.dialer.DialContext(ctx, "tcp", addr) + case <-ctx.Done(): + return nil, ctx.Err() + } + } + // No hold for this addr. + d.mu.Unlock() + return d.dialer.DialContext(ctx, "tcp", addr) +} + +// Hold is a connection hold that blocks the dialer when a connection attempt is +// made to the given addr. +type Hold struct { + dialer *BlockingDialer + blockCh chan error + waitCh chan struct{} + err error + addr string +} + +// Hold blocks the dialer when a connection attempt is made to the given addr. +// A hold is valid for exactly one connection attempt. Multiple holds for an +// addr can be added, and they will apply in the order that the connection are +// attempted. +func (d *BlockingDialer) Hold(addr string) *Hold { + d.mu.Lock() + defer d.mu.Unlock() + + h := Hold{dialer: d, blockCh: make(chan error), waitCh: make(chan struct{}), addr: addr} + d.holds[addr] = append(d.holds[addr], &h) + return &h +} + +// Wait returns a channel that blocks until there is a connection attempt on +// this Hold. Return false if the context has expired, true otherwise. +func (h *Hold) Wait(ctx context.Context) bool { + logger.Infof("Waiting for a connection attempt to addr %s", h.addr) select { - case <-d.blockCh: case <-ctx.Done(): - return nil, ctx.Err() + return false + case <-h.waitCh: } - return d.dialer.DialContext(ctx, "tcp", addr) + logger.Infof("Connection attempt started to addr %s", h.addr) + return true +} + +// Resume unblocks the dialer for the given addr. If called multiple times on +// the same hold, Resume panics. +func (h *Hold) Resume() { + logger.Infof("Resuming connection attempt to addr %s", h.addr) + close(h.blockCh) } -// Resume unblocks the dialer. It panics if called more than once. -func (d *BlockingDialer) Resume() { - close(d.blockCh) +// Fail fails the connection attempt. If called multiple times on the same hold, +// Fail panics. +func (h *Hold) Fail(err error) { + logger.Infof("Failing connection attempt to addr %s", h.addr) + h.err = err + close(h.blockCh) } diff --git a/internal/testutils/blocking_context_dialer_test.go b/internal/testutils/blocking_context_dialer_test.go new file mode 100644 index 000000000000..745b6674accb --- /dev/null +++ b/internal/testutils/blocking_context_dialer_test.go @@ -0,0 +1,194 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + "context" + "errors" + "testing" + "time" +) + +const testTimeout = 5 * time.Second + +func (s) TestBlockingDialer_NoHold(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + + // This should not block. + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + conn, err := d.DialContext(ctx, lis.Addr().String()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + conn.Close() +} + +func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + done := make(chan struct{}) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + go func() { + conn, err := d.DialContext(ctx, lis.Addr().String()) + if err != nil { + t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err) + } + conn.Close() + done <- struct{}{} + }() + + // This should block until the goroutine above is scheduled. + if !h.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr) + } + select { + case <-done: + t.Errorf("Expected dialer to be blocked.") + default: + } + + h.Resume() // Unblock the above goroutine. + + select { + case <-done: + case <-ctx.Done(): + t.Errorf("Timeout waiting for connection attempt to resume.") + } +} + +func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + wantErr := errors.New("test error") + + done := make(chan struct{}) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + go func() { + _, err := d.DialContext(ctx, lis.Addr().String()) + if !errors.Is(err, wantErr) { + t.Errorf("BlockingDialer.DialContext() after Fail(): got error %v, want %v", err, wantErr) + } + done <- struct{}{} + }() + + if !h.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr) + } + select { + case <-done: + t.Errorf("Expected dialer to still be blocked after Wait()") + default: + } + + h.Fail(wantErr) + + select { + case <-done: + case <-ctx.Done(): + t.Errorf("Timeout waiting for connection attempt to fail.") + } +} + +func (s) TestBlockingDialer_ContextCanceled(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + done := make(chan struct{}) + testCtx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctx, cancel := context.WithCancel(testCtx) + defer cancel() + go func() { + _, err := d.DialContext(ctx, lis.Addr().String()) + if !errors.Is(err, context.Canceled) { + t.Errorf("BlockingDialer.DialContext() after context cancel: got error %v, want %v", err, context.Canceled) + } + done <- struct{}{} + }() + if !h.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr) + } + cancel() + + select { + case <-done: + case <-testCtx.Done(): + t.Errorf("Timeout while waiting for Wait to return.") + } +} + +func (s) TestBlockingDialer_CancelWait(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + testCtx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctx, cancel := context.WithTimeout(testCtx, 0) + defer cancel() + done := make(chan struct{}) + go func() { + if h.Wait(ctx) { + t.Errorf("Expected cancel to return false when context expires") + } + done <- struct{}{} + }() + + select { + case <-done: + case <-testCtx.Done(): + t.Errorf("Timeout while waiting for Wait to return.") + } +} diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index f2835baa50f5..524bc6ef0a2f 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "math" + "math/rand" "net" + "slices" "testing" "time" @@ -30,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/envconfig" @@ -41,6 +44,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -144,33 +148,27 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) { } } -// startTestServiceBackends starts num stub servers. -func startTestServiceBackends(t *testing.T, num int) ([]*stubserver.StubServer, func()) { +// startTestServiceBackends starts num stub servers. It returns their addresses. +// Servers are closed when the test is stopped. +func startTestServiceBackends(t *testing.T, num int) []string { t.Helper() - - var servers []*stubserver.StubServer + addrs := make([]string, 0, num) for i := 0; i < num; i++ { - servers = append(servers, stubserver.StartTestService(t, nil)) - } - - return servers, func() { - for _, server := range servers { - server.Stop() - } + server := stubserver.StartTestService(t, nil) + t.Cleanup(server.Stop) + addrs = append(addrs, server.Address) } + return addrs } -// backendOptions returns a slice of e2e.BackendOptions for the given stub -// servers. -func backendOptions(t *testing.T, servers []*stubserver.StubServer) []e2e.BackendOptions { +// backendOptions returns a slice of e2e.BackendOptions for the given server +// addresses. +func backendOptions(t *testing.T, serverAddrs []string) []e2e.BackendOptions { t.Helper() var backendOpts []e2e.BackendOptions - for _, server := range servers { - backendOpts = append(backendOpts, e2e.BackendOptions{ - Port: testutils.ParsePort(t, server.Address), - Weight: 1, - }) + for _, addr := range serverAddrs { + backendOpts = append(backendOpts, e2e.BackendOptions{Port: testutils.ParsePort(t, addr)}) } return backendOpts } @@ -208,10 +206,10 @@ func checkRPCSendOK(t *testing.T, ctx context.Context, client testgrpc.TestServi return backendCount } -// makeNonExistentBackends returns a slice of e2e.BackendOptions with num -// listeners, each of which is closed immediately. Useful to simulate servers -// that are unreachable. -func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { +// makeNonExistentBackends returns a slice of strings with num listeners, each +// of which is closed immediately. Useful to simulate servers that are +// unreachable. +func makeNonExistentBackends(t *testing.T, num int) []string { closedListeners := make([]net.Listener, 0, num) for i := 0; i < num; i++ { lis, err := testutils.LocalTCPListener() @@ -222,16 +220,14 @@ func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { } // Stop the servers that we want to be unreachable and collect their - // addresses. - backendOptions := make([]e2e.BackendOptions, 0, num) + // addresses. We don't close them in the loop above to make sure ports are + // not reused across them. + addrs := make([]string, 0, num) for _, lis := range closedListeners { - backendOptions = append(backendOptions, e2e.BackendOptions{ - Port: testutils.ParsePort(t, lis.Addr().String()), - Weight: 1, - }) + addrs = append(addrs, lis.Addr().String()) lis.Close() } - return backendOptions + return addrs } // Tests that when an aggregate cluster is configured with ring hash policy, and @@ -241,8 +237,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() - servers, stop := startTestServiceBackends(t, 2) - defer stop() + addrs := startTestServiceBackends(t, 2) const primaryClusterName = "new_cluster_1" const primaryServiceName = "new_eds_service_1" @@ -255,7 +250,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 2), + Backends: backendOptions(t, makeNonExistentBackends(t, 2)), }}, }) ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -263,7 +258,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: backendOptions(t, servers), + Backends: backendOptions(t, addrs), }}, }) primaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ @@ -318,15 +313,8 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T var got int for backend, got = range gotPerBackend { } - found := false - for _, server := range servers { - if backend == server.Address { - found = true - break - } - } - if !found { - t.Errorf("Got RPCs routed to an unexpected backend: %v, want one of %v", backend, servers) + if !slices.Contains(addrs, backend) { + t.Errorf("Got RPCs routed to an unexpected backend: %v, want one of %v", backend, addrs) } if got != 100 { t.Errorf("Got %v RPCs routed to a backend, want %v", got, 100) @@ -355,15 +343,14 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( const logicalDNSClusterName = "logical_dns_cluster" const clusterName = "aggregate_cluster" - backends, stop := startTestServiceBackends(t, 1) - defer stop() + backends := startTestServiceBackends(t, 1) endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 1), + Backends: backendOptions(t, makeNonExistentBackends(t, 1)), Priority: 0, }}, }) @@ -406,7 +393,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( } dnsR := replaceDNSResolver(t) - dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -419,7 +406,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( var got string for got = range gotPerBackend { } - if want := backends[0].Address; got != want { + if want := backends[0]; got != want { t.Errorf("Got RPCs routed to an unexpected got: %v, want %v", got, want) } } @@ -436,15 +423,14 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN const logicalDNSClusterName = "logical_dns_cluster" const clusterName = "aggregate_cluster" - backends, stop := startTestServiceBackends(t, 1) - defer stop() + backends := startTestServiceBackends(t, 1) endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 1), + Backends: backendOptions(t, makeNonExistentBackends(t, 1)), Priority: 0, }}, }) @@ -487,7 +473,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN } dnsR := replaceDNSResolver(t) - dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) dialer := testutils.NewBlockingDialer() cp := grpc.ConnectParams{ @@ -510,6 +496,8 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN defer conn.Close() client := testgrpc.NewTestServiceClient(conn) + hold := dialer.Hold(backends[0]) + errCh := make(chan error, 2) go func() { if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { @@ -539,14 +527,17 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN }() // Allow the connection attempts to complete. - dialer.Resume() + if !hold.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to %s", backends[0]) + } + hold.Resume() // RPCs should complete successfully. for range []int{0, 1} { select { case err := <-errCh: if err != nil { - t.Errorf("Expected 2 rpc to succeed, but failed: %v", err) + t.Errorf("Expected 2 rpc to succeed, but at least one failed: %v", err) } case <-ctx.Done(): t.Fatalf("Timed out waiting for RPCs to complete") @@ -557,11 +548,13 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN // Tests that ring hash policy that hashes using channel id ensures all RPCs to // go 1 particular backend. func (s) TestRingHash_ChannelIdHashing(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() - + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() + + backends := startTestServiceBackends(t, 4) + const clusterName = "cluster" endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: clusterName, @@ -578,9 +571,6 @@ func (s) TestRingHash_ChannelIdHashing(t *testing.T) { route := channelIDHashRoute("new_route", virtualHostName, clusterName) listener := e2e.DefaultClientListener(virtualHostName, route.Name) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - err := xdsServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, @@ -599,15 +589,16 @@ func (s) TestRingHash_ChannelIdHashing(t *testing.T) { defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - received := checkRPCSendOK(t, ctx, client, 100) + numRPCs := 100 + received := checkRPCSendOK(t, ctx, client, numRPCs) if len(received) != 1 { t.Errorf("Got RPCs routed to %v backends, want %v", len(received), 1) } - var count int - for _, count = range received { + var got int + for _, got = range received { } - if count != 100 { - t.Errorf("Got %v RPCs routed to a backend, want %v", count, 100) + if got != numRPCs { + t.Errorf("Got %v RPCs routed to a backend, want %v", got, numRPCs) } } @@ -630,12 +621,11 @@ func headerHashRoute(routeName, virtualHostName, clusterName, header string) *v3 // Tests that ring hash policy that hashes using a header value can spread RPCs // across all the backends. func (s) TestRingHash_HeaderHashing(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() + backends := startTestServiceBackends(t, 4) // We must set the host name socket address in EDS, as the ring hash policy // uses it to construct the ring. - host, _, err := net.SplitHostPort(backends[0].Address) + host, _, err := net.SplitHostPort(backends[0]) if err != nil { t.Fatalf("Failed to split host and port from stubserver: %v", err) } @@ -681,41 +671,46 @@ func (s) TestRingHash_HeaderHashing(t *testing.T) { client := testgrpc.NewTestServiceClient(conn) // Note each type of RPC contains a header value that will always be hashed - // to a specific backend as the header value matches the value used to + // to a specific addr as the header value matches the value used to // create the entry in the ring. - for _, backend := range backends { - ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0")) + for _, addr := range backends { + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", addr+"_0")) reqPerBackend := checkRPCSendOK(t, ctx, client, 1) - if reqPerBackend[backend.Address] != 1 { - t.Errorf("Got RPC routed to backend %v, want %v", reqPerBackend, backend.Address) + if reqPerBackend[addr] != 1 { + t.Errorf("Got RPC routed to addr %v, want %v", reqPerBackend, addr) } } } -// Tests that ring hash policy that hashes using a header value and regex -// rewrite to aggregate RPCs to 1 backend. -func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() - +// endpointResource creates a ClusterLoadAssignment containing a single locality with +// the given addresses. +func endpointResource(t *testing.T, clusterName string, addrs []string) *v3endpointpb.ClusterLoadAssignment { // We must set the host name socket address in EDS, as the ring hash policy // uses it to construct the ring. - host, _, err := net.SplitHostPort(backends[0].Address) + host, _, err := net.SplitHostPort(addrs[0]) if err != nil { - t.Fatalf("Failed to split host and port from stubserver: %v", err) + t.Fatalf("failed to split host and port from stubserver: %v", err) } - xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) - defer stop() - clusterName := "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + return e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: clusterName, Host: host, Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), + Backends: backendOptions(t, addrs), Weight: 1, }}, }) +} + +// Tests that ring hash policy that hashes using a header value and regex +// rewrite to aggregate RPCs to 1 backend. +func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { + backends := startTestServiceBackends(t, 4) + + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer stop() + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -735,7 +730,7 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - err = xdsServer.Update(ctx, e2e.UpdateOptions{ + err := xdsServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, Clusters: []*v3clusterpb.Cluster{cluster}, @@ -760,7 +755,7 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { // hashing to the same value. gotPerBackend := make(map[string]int) for _, backend := range backends { - ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0")) + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend+"_0")) res := checkRPCSendOK(t, ctx, client, 100) for addr, count := range res { gotPerBackend[addr] += count @@ -818,20 +813,13 @@ func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.C // Tests that ring hash policy that hashes using a random value. func (s) TestRingHash_NoHashPolicy(t *testing.T) { - backends, stop := startTestServiceBackends(t, 2) - defer stop() + backends := startTestServiceBackends(t, 2) numRPCs := computeIdealNumberOfRPCs(t, .5, errorTolerance) xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() const clusterName = "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: clusterName, - Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), - Weight: 1, - }}, - }) + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -864,25 +852,24 @@ func (s) TestRingHash_NoHashPolicy(t *testing.T) { // Send a large number of RPCs and check that they are distributed randomly. gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) for _, backend := range backends { - got := float64(gotPerBackend[backend.Address]) / float64(numRPCs) + got := float64(gotPerBackend[backend]) / float64(numRPCs) want := .5 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, want, errorTolerance) } } } // Tests that we observe endpoint weights. func (s) TestRingHash_EndpointWeights(t *testing.T) { - backends, stop := startTestServiceBackends(t, 3) - defer stop() + backends := startTestServiceBackends(t, 3) xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer stop() const clusterName = "cluster" backendOpts := []e2e.BackendOptions{ - {Port: testutils.ParsePort(t, backends[0].Address)}, - {Port: testutils.ParsePort(t, backends[1].Address)}, - {Port: testutils.ParsePort(t, backends[2].Address), Weight: 2}, + {Port: testutils.ParsePort(t, backends[0])}, + {Port: testutils.ParsePort(t, backends[1])}, + {Port: testutils.ParsePort(t, backends[2]), Weight: 2}, } endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -929,18 +916,870 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) { numRPCs := computeIdealNumberOfRPCs(t, .25, errorTolerance) gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) - got := float64(gotPerBackend[backends[0].Address]) / float64(numRPCs) + got := float64(gotPerBackend[backends[0]]) / float64(numRPCs) want := .25 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0], got, want, errorTolerance) } - got = float64(gotPerBackend[backends[1].Address]) / float64(numRPCs) + got = float64(gotPerBackend[backends[1]]) / float64(numRPCs) if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1], got, want, errorTolerance) } - got = float64(gotPerBackend[backends[2].Address]) / float64(numRPCs) + got = float64(gotPerBackend[backends[2]]) / float64(numRPCs) want = .50 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, want, errorTolerance) + } +} + +// Tests that ring hash policy evaluation will continue past the terminal policy +// if no results are produced yet. +func (s) TestRingHash_ContinuesPastTerminalPolicyThatDoesNotProduceResult(t *testing.T) { + backends := startTestServiceBackends(t, 2) + + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + + // Even though this policy is terminal, since it produces no result, we + // continue past it to find a policy that produces results. + hashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "header_not_present", + }, + }, + } + hashPolicy.Terminal = true + hashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "address_hash", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy, &hashPolicy2} + + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + err := xdsServer.Update(context.Background(), e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Note that the first policy does not match, and in the second hash policy, + // each type of RPC contains a header value that always hashes to a specific + // backend, as the header value matches the value used to create the entry + // in the ring. + wantBackend := backends[0] + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", wantBackend+"_0")) + numRPCs := 100 + gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) + if got := gotPerBackend[wantBackend]; got != numRPCs { + t.Errorf("Got %v RPCs routed to backend %v, want %v", got, wantBackend, numRPCs) + } +} + +// Tests that a random hash is used when header hashing specified a header field +// that the RPC did not have. +func (s) TestRingHash_HashOnHeaderThatIsNotPresent(t *testing.T) { + backends := startTestServiceBackends(t, 2) + wantFractionPerBackend := .5 + numRPCs := computeIdealNumberOfRPCs(t, wantFractionPerBackend, errorTolerance) + + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := headerHashRoute("new_route", virtualHostName, clusterName, "header_not_present") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + err := xdsServer.Update(context.Background(), e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Send a large number of RPCs and check that they are distributed randomly. + gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) + for _, backend := range backends { + got := float64(gotPerBackend[backend]) / float64(numRPCs) + if !cmp.Equal(got, wantFractionPerBackend, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, wantFractionPerBackend, errorTolerance) + } + } +} + +// Tests that a random hash is used when only unsupported hash policies are +// configured. +func (s) TestRingHash_UnsupportedHashPolicyDefaultToRandomHashing(t *testing.T) { + backends := startTestServiceBackends(t, 2) + wantFractionPerBackend := .5 + numRPCs := computeIdealNumberOfRPCs(t, wantFractionPerBackend, errorTolerance) + + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + unsupportedHashPolicy1 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Cookie_{ + Cookie: &v3routepb.RouteAction_HashPolicy_Cookie{Name: "cookie"}, + }, + } + unsupportedHashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{ + ConnectionProperties: &v3routepb.RouteAction_HashPolicy_ConnectionProperties{SourceIp: true}, + }, + } + unsupportedHashPolicy3 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_QueryParameter_{ + QueryParameter: &v3routepb.RouteAction_HashPolicy_QueryParameter{Name: "query_parameter"}, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&unsupportedHashPolicy1, &unsupportedHashPolicy2, &unsupportedHashPolicy3} + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + err := xdsServer.Update(context.Background(), e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Send a large number of RPCs and check that they are distributed randomly. + gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) + for _, backend := range backends { + got := float64(gotPerBackend[backend]) / float64(numRPCs) + if !cmp.Equal(got, wantFractionPerBackend, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, wantFractionPerBackend, errorTolerance) + } + } +} + +// Tests that unsupported hash policy types are all ignored before a supported +// policy. +func (s) TestRingHash_UnsupportedHashPolicyUntilChannelIdHashing(t *testing.T) { + backends := startTestServiceBackends(t, 2) + + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + unsupportedHashPolicy1 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Cookie_{ + Cookie: &v3routepb.RouteAction_HashPolicy_Cookie{Name: "cookie"}, + }, + } + unsupportedHashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{ + ConnectionProperties: &v3routepb.RouteAction_HashPolicy_ConnectionProperties{SourceIp: true}, + }, + } + unsupportedHashPolicy3 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_QueryParameter_{ + QueryParameter: &v3routepb.RouteAction_HashPolicy_QueryParameter{Name: "query_parameter"}, + }, + } + channelIDhashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{ + FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{ + Key: "io.grpc.channel_id", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&unsupportedHashPolicy1, &unsupportedHashPolicy2, &unsupportedHashPolicy3, &channelIDhashPolicy} + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Since this is using the channel ID hashing policy, all requests should + // be routed to the same backend. + numRPCs := 100 + gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) + if len(gotPerBackend) != 1 { + t.Errorf("Got RPCs routed to %v backends, want 1", len(gotPerBackend)) + } + var got int + for _, got = range gotPerBackend { + } + if got != numRPCs { + t.Errorf("Got RPCs routed to %v, want %v", got, numRPCs) + } +} + +// Tests that ring hash policy that hashes using a random value can spread RPCs +// across all the backends according to locality weight. +func (s) TestRingHash_RandomHashingDistributionAccordingToLocalityAndEndpointWeight(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + + backends := startTestServiceBackends(t, 2) + + const clusterName = "cluster" + const locality1Weight = uint32(1) + const endpoint1Weight = uint32(1) + const locality2Weight = uint32(2) + const endpoint2Weight = uint32(2) + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{ + Port: testutils.ParsePort(t, backends[0]), + Weight: endpoint1Weight, + }}, + Weight: locality1Weight, + }, + { + Backends: []e2e.BackendOptions{{ + Port: testutils.ParsePort(t, backends[1]), + Weight: endpoint2Weight, + }}, + Weight: locality2Weight, + }, + }, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + const weight1 = endpoint1Weight * locality1Weight + const weight2 = endpoint2Weight * locality2Weight + const wantRPCs1 = float64(weight1) / float64(weight1+weight2) + const wantRPCs2 = float64(weight2) / float64(weight1+weight2) + numRPCs := computeIdealNumberOfRPCs(t, math.Min(wantRPCs1, wantRPCs2), errorTolerance) + + // Send a large number of RPCs and check that they are distributed randomly. + gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs) + got := float64(gotPerBackend[backends[0]]) / float64(numRPCs) + if !cmp.Equal(got, wantRPCs1, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs1, errorTolerance) + } + got = float64(gotPerBackend[backends[1]]) / float64(numRPCs) + if !cmp.Equal(got, wantRPCs2, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs2, errorTolerance) + } +} + +// TestRingHash_FixedHashingTerminalPolicy tests that ring hash policy that +// hashes using a fixed string ensures all RPCs to go 1 particular backend; and +// that subsequent hashing policies are ignored due to the setting of terminal. +func (s) TestRingHash_FixedHashingTerminalPolicy(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + + backends := startTestServiceBackends(t, 2) + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + + // Even though this policy is terminal, since it produces no result, we + // continue past it to find a policy that produces results. + hashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "fixed_string", + }, + }, + Terminal: true, + } + hashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "random_string", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy, &hashPolicy2} + + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Check that despite the matching random string header, since the fixed + // string hash policy is terminal, requests all get routed to the same + // host. + gotPerBackend := make(map[string]int) + numRPCs := 100 + for i := 0; i < numRPCs; i++ { + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs( + "fixed_string", backends[0]+"_0", + "random_string", fmt.Sprintf("%d", rand.Int())), + ) + var remote peer.Peer + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&remote)) + if err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + gotPerBackend[remote.Addr.String()]++ + } + + if len(gotPerBackend) != 1 { + t.Error("Got RPCs routed to multiple backends, want a single backend") + } + if got := gotPerBackend[backends[0]]; got != numRPCs { + t.Errorf("Got %v RPCs routed to a backend, want %v", got, numRPCs) + } +} + +// TestRingHash_IdleToReady tests that the channel will go from idle to ready +// via connecting; (though it is not possible to catch the connecting state +// before moving to ready). +func (s) TestRingHash_IdleToReady(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + + backends := startTestServiceBackends(t, 1) + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := channelIDHashRoute("new_route", virtualHostName, clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + checkRPCSendOK(t, ctx, client, 1) + + if got, want := conn.GetState(), connectivity.Ready; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } +} + +// Test that the channel will transition to READY once it starts +// connecting even if there are no RPCs being sent to the picker. +func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: true, + }) + defer stop() + + backend := stubserver.StartTestService(t, &stubserver.StubServer{ + // We expect the server EmptyCall to not be call here because the + // aggregated channel state is never READY when the call is pending. + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + t.Errorf("EmptyCall() should not have been called") + return &testpb.Empty{}, nil + }, + }) + defer backend.Stop() + + nonExistantServerAddr := makeNonExistentBackends(t, 1)[0] + + const clusterName = "cluster" + + // We must set the host name socket address in EDS, as the ring hash policy + // uses it to construct the ring. + host, _, err2 := net.SplitHostPort(backend.Address) + if err2 != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err2) + } + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: []e2e.BackendOptions{ + {Port: testutils.ParsePort(t, backend.Address)}, + {Port: testutils.ParsePort(t, nonExistantServerAddr)}, + }, + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dialer := testutils.NewBlockingDialer() + dialOpts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(dialer.DialContext), + } + conn, err := grpc.NewClient("xds:///test.server", dialOpts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + rpcCtx, rpcCancel := context.WithCancel(ctx) + go func() { + rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantServerAddr+"_0")) + _, err := client.EmptyCall(rpcCtx, &testpb.Empty{}) + if status.Code(err) != codes.Canceled { + t.Errorf("Expected RPC to be canceled, got error: %v", err) + } + }() + + // Wait for the connection attempt to the real backend. + hold := dialer.Hold(backend.Address) + if !hold.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend %v.", backend.Address) + } + // Now cancel the RPC while we are still connecting. + rpcCancel() + + // This allows the connection attempts to continue. The RPC was cancelled, + // and one of the backends is up. The conn becomes Ready due to the + // connection attempt to the existing backend succeeding. + hold.Resume() + + for state := conn.GetState(); state != connectivity.Ready; state = conn.GetState() { + changed := conn.WaitForStateChange(ctx, state) + if !changed { + t.Errorf("Timeout waiting for the conn to become ready.") + break + } + } +} + +// Tests that when the first pick is down leading to a transient failure, we +// will move on to the next ring hash entry. +func (s) TestRingHash_TransientFailureCheckNextOne(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer stop() + backends := startTestServiceBackends(t, 1) + defer stop() + + // We must set the host name socket address in EDS, as the ring hash policy + // uses it to construct the ring. + host, _, err := net.SplitHostPort(backends[0]) + if err != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err) + } + nonExistentBackends := makeNonExistentBackends(t, 1) + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, append(nonExistentBackends, backends...)), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + err = xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Note each type of RPC contains a header value that will always be hashed + // the value that was used to place the non existent endpoint on the ring. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", nonExistentBackends[0]+"_0")) + reqPerBackend := checkRPCSendOK(t, ctx, client, 1) + var got string + for got = range reqPerBackend { + } + if want := backends[0]; got != want { + t.Errorf("Got RPC routed to addr %v, want %v", got, want) + } +} + +// Tests for a bug seen in the wild in c-core, where ring_hash started with no +// endpoints and reported TRANSIENT_FAILURE, then got an update with endpoints +// and reported IDLE, but the picker update was squelched, so it failed to ever +// get reconnected. +func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer stop() + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{}}, // note the empty locality (no endpoint). + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := e2e.DefaultRouteConfig("new_route", virtualHostName, clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + err := xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil || status.Code(err) != codes.Unavailable { + t.Errorf("rpc EmptyCall() succeeded, want error") + } + + if got, want := conn.GetState(), connectivity.TransientFailure; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + backends := startTestServiceBackends(t, 1) + defer stop() + + // Update EDS with the new backend endpoint. + endpoints = e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + err = xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + // A wait_for_ready RPC should succeed, and the channel should report READY. + _, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) + if err != nil { + t.Errorf("rpc EmptyCall() failed: %v", err) + } + if got, want := conn.GetState(), connectivity.Ready; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } +} + +// Tests that when all backends are down, we keep reattempting. +func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) { + xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer stop() + + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener := testutils.NewRestartableListener(lis) + restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener, + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + defer restartableServer.Stop() + + host, _, err := net.SplitHostPort(restartableServer.Address) + if err != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err) + } + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, []string{restartableServer.Address}), + Weight: 1, + }}}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + err = xdsServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + }) + if err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + checkRPCSendOK(t, ctx, client, 1) + + t.Log("Stopping the backend server") + restartableListener.Stop() + + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil || status.Code(err) != codes.Unavailable { + t.Errorf("rpc EmptyCall() succeeded, want Unavailable error") + } + + t.Log("Restarting the backend server") + restartableListener.Restart() + + // Wait for channel to become connected without any pending RPC. + testutils.AwaitState(ctx, t, conn, connectivity.Ready) }