From d49b21417ed43ab4d98d3f3095a56e449fc09b16 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 7 Aug 2023 21:57:22 -0400 Subject: [PATCH 1/9] Add least request balancer --- balancer/leastrequest/leastrequest.go | 160 ++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 balancer/leastrequest/leastrequest.go diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go new file mode 100644 index 000000000000..ea8d11237f80 --- /dev/null +++ b/balancer/leastrequest/leastrequest.go @@ -0,0 +1,160 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package leastrequest + +import ( + "encoding/json" + "fmt" + "sync/atomic" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/serviceconfig" +) + +// Name is the name of the least request balancer. +const Name = "least_request_experimental" + +var logger = grpclog.Component("least request") + +type bb struct{} + +func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + lbConfig := &leastRequestConfig{ + ChoiceCount: 2, + } + if err := json.Unmarshal(s, lbConfig); err != nil { + return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err) + } + // "If a LeastRequestLoadBalancingConfig with a choice_count > 10 is + // received, the least_request_experimental policy will set choice_count = + // 10." + if lbConfig.ChoiceCount > 10 { + lbConfig.ChoiceCount = 10 + } + // I asked about this in chat but what happens if choiceCount < 2 (0 or 1)? + // Doing this for now. + if lbConfig.ChoiceCount < 2 { + lbConfig.ChoiceCount = 2 + } + return lbConfig, nil +} + +func (bb) Name() string { + return Name +} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &leastRequestBalancer{} + baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true}) + baseBalancer := baseBuilder.Build(cc, bOpts) + b.Balancer = baseBalancer + return b +} + +type leastRequestBalancer struct { + // Embeds balancer.Balancer because needs to intercept UpdateClientConnState + // to learn about choiceCount. + balancer.Balancer + + choiceCount uint32 +} + +type leastRequestConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + + // ChoiceCount is the number of random SubConns to sample to try and find + // the one with the Least Request. If unset, defaults to 2. If set to < 2, + // will become 2, and if set to > 10, will become 10. + ChoiceCount uint32 `json:"choiceCount,omitempty"` +} + +func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lrCfg, ok := s.BalancerConfig.(*leastRequestConfig) + if !ok { + logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + lrb.choiceCount = lrCfg.ChoiceCount + return lrb.Balancer.UpdateClientConnState(s) +} + +type scWithRPCCount struct { + sc balancer.SubConn + numRPCs int32 +} + +func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker { + logger.Infof("least-request: Build called with info: %v", info) + if len(info.ReadySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + scs := make([]scWithRPCCount, 0, len(info.ReadySCs)) + for sc := range info.ReadySCs { + scs = append(scs, scWithRPCCount{ + sc: sc, + }) + } + + return &picker{ + choiceCount: lrb.choiceCount, + subConns: scs, + } +} + +type picker struct { + // choiceCount is the number of random SubConns to find the one with + // the least request. + choiceCount uint32 + // Built out when receives list of ready RPCs. + subConns []scWithRPCCount +} + +func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + var pickedSC *scWithRPCCount + for i := 0; i < int(p.choiceCount); i++ { + index := grpcrand.Uint32() % uint32(len(p.subConns)) + sc := p.subConns[index] + if pickedSC == nil { + pickedSC = &sc + continue + } + if sc.numRPCs < pickedSC.numRPCs { + pickedSC = &sc + } + } + // "The counter for a subchannel should be atomically incremented by one + // after it has been successfully picked by the picker." - A48 + atomic.AddInt32(&pickedSC.numRPCs, 1) + + // "the picker should add a callback for atomically decrementing the + // subchannel counter once the RPC finishes (regardless of Status code)." - + // A48. + done := func(balancer.DoneInfo) { + atomic.AddInt32(&pickedSC.numRPCs, -1) + } + + return balancer.PickResult{ + SubConn: pickedSC.sc, + Done: done, + }, nil +} From 476a9c766940d35564d3649863fdaa4ca95b10e9 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 8 Aug 2023 19:28:39 -0400 Subject: [PATCH 2/9] Fixed ref counts and also added e2e test --- balancer/leastrequest/balancer_test.go | 340 +++++++++++++++++++++++++ balancer/leastrequest/leastrequest.go | 76 ++++-- 2 files changed, 389 insertions(+), 27 deletions(-) create mode 100644 balancer/leastrequest/balancer_test.go diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go new file mode 100644 index 000000000000..f79cf8bbef91 --- /dev/null +++ b/balancer/leastrequest/balancer_test.go @@ -0,0 +1,340 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leastrequest + +import ( + "context" + "encoding/json" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" +) + +const ( + defaultTestTimeout = 5 * time.Second +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestParseConfig(t *testing.T) { + parser := bb{} + tests := []struct { + name string + input string + wantCfg serviceconfig.LoadBalancingConfig + wantErr string + }{ + { + name: "happy-case-default", + input: `{}`, + wantCfg: &LBConfig{ + ChoiceCount: 2, + }, + }, + { + name: "happy-case-choice-count-set", + input: `{"choiceCount": 3}`, + wantCfg: &LBConfig{ + ChoiceCount: 3, + }, + }, + { + name: "happy-case-choice-count-greater-than-ten", + input: `{"choiceCount": 11}`, + wantCfg: &LBConfig{ + ChoiceCount: 10, + }, + }, + { + name: "choice-count-less-than-2", + input: `{"choiceCount": 1}`, + wantErr: "must be >= 2", + }, + { + name: "invalid-json", + input: "{{invalidjson{{", + wantErr: "invalid character", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input)) + // Substring match makes this very tightly coupled to the + // internalserviceconfig.BalancerConfig error strings. However, it + // is important to distinguish the different types of error messages + // possible as the parser has a few defined buckets of ways it can + // error out. + if (gotErr != nil) != (test.wantErr != "") { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) { + t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr) + } + if test.wantErr != "" { + return + } + if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" { + t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff) + } + }) + } +} + +// setupBackends spins up four test backends, each listening on a port on +// localhost. The four backends always reply with an empty response with no +// error, and for streaming receive until hitting an EOF error. +func setupBackends(t *testing.T) ([]string, func()) { + t.Helper() + + backends := make([]*stubserver.StubServer, 4) + addresses := make([]string, 4) + // Construct and start 4 working backends. + for i := 0; i < 4; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started good TestService backend at: %q", backend.Address) + backends[i] = backend + addresses[i] = backend.Address + } + cancel := func() { + for _, backend := range backends { + backend.Stop() + } + } + return addresses, cancel +} + +// checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn, +// connected to a server exposing the test.grpc_testing.TestService, are +// roundrobined across the given backend addresses. +// +// Returns a non-nil error if context deadline expires before RPCs start to get +// roundrobined across the given backends. +func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { + wantAddrCount := make(map[string]int) + for _, addr := range addrs { + wantAddrCount[addr.Addr]++ + } + gotAddrCount := make(map[string]int) + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + gotAddrCount = make(map[string]int) + // Perform 3 iterations. + var iterations [][]string + for i := 0; i < 3; i++ { + iteration := make([]string, len(addrs)) + for c := 0; c < len(addrs); c++ { + var peer peer.Peer + client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) + if peer.Addr != nil { + iteration[c] = peer.Addr.String() + } + } + iterations = append(iterations, iteration) + } + // Ensure the the first iteration contains all addresses in addrs. + for _, addr := range iterations[0] { + gotAddrCount[addr]++ + } + if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { + continue + } + // Ensure all three iterations contain the same addresses. + if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) { + continue + } + return nil + } + return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v; got: %v", addrs, gotAddrCount) +} + +// TestLeastRequestE2E tests the Least Request LB policy in an e2e style. The +// Least Request balancer is configured as the top level balancer of the +// channel, and is passed three addresses. Eventually, the test creates three +// streams, which should be on certain backends according to the least request +// algorithm. The randomness in the picker is injected in the test to be +// deterministic, allowing the test to make assertions on the distribution. +func (s) TestLeastRequestE2E(t *testing.T) { + defer func(u func() uint32) { + grpcranduint32 = u + }(grpcranduint32) + var index int + indexes := []uint32{ + 0, 0, 1, 1, 2, 2, // Triggers a round robin distribution. + } + grpcranduint32 = func() uint32 { + ret := indexes[index%len(indexes)] + index++ + return ret + } + addresses, cancel := setupBackends(t) + defer cancel() + + mr := manual.NewBuilderWithScheme("lr-e2e") + defer mr.Close() + + // Configure least request as top level balancer of channel. + lrscJSON := ` +{ + "loadBalancingConfig": [ + { + "least_request_experimental": { + "choiceCount": 2 + } + } + ] +}` + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON) + firstThreeAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + {Addr: addresses[2]}, + } + mr.InitialState(resolver.State{ + Addresses: firstThreeAddresses, + ServiceConfig: sc, + }) + + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + // Wait for all 3 backends to round robin across. The happens because a + // SubConn transitioning into READY causes a new picker update. Once the + // picker update with all 3 backends is present, this test can start to make + // assertions based on those backends. + if err := checkRoundRobinRPCs(ctx, testServiceClient, firstThreeAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + // Map ordering of READY SubConns is non deterministic. Thus, perform 3 RPCs + // mocked from the random to each index to learn the addresses of SubConns + // at each index. + index = 0 + peerAtIndex := make([]string, 3) + var peer0 peer.Peer + if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil { + t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + } + peerAtIndex[0] = peer0.Addr.String() + if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil { + t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + } + peerAtIndex[1] = peer0.Addr.String() + if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil { + t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + } + peerAtIndex[2] = peer0.Addr.String() + + // Start streaming RPCs, but do not finish them. Each subsequent stream + // should be started according to the least request algorithm, and chosen + // between the indexes provided. + index = 0 + indexes = []uint32{ + 0, 0, // Causes first stream to be on first address. + 0, 1, // Compares first address (which already has a RPC) to second, so choose second. + 1, 2, // Compares second address (which already has a RPC) to third, so choose third. + } + // Start a streaming call on first, but don't finish the stream. + var peer1 peer.Peer + stream1, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer1)) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + + // Start a second streaming call. From the indexes injected into random + // number generator, this should compare Address 1, which already has a RPC + // to Address 2, so thus should start the stream on Address 2. + var peer2 peer.Peer + stream2, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer2)) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + + // Start a third streaming call. From the indexes injected into random + // number generator, this should compare Address 2, which already has a RPC + // to Address 3, so thus should start the stream on Address 3. + var peer3 peer.Peer + stream3, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer3)) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + + // Finish the streams to collect peer information. + stream1.CloseSend() + if _, err = stream1.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + stream2.CloseSend() + if _, err = stream2.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + stream3.CloseSend() + if _, err = stream3.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + if peer1.Addr.String() != peerAtIndex[0] { + t.Fatalf("got: %v, want: %v", peer1.Addr.String(), peerAtIndex[0]) + } + if peer2.Addr.String() != peerAtIndex[1] { + t.Fatalf("got: %v, want: %v", peer2.Addr.String(), peerAtIndex[1]) + } + if peer3.Addr.String() != peerAtIndex[2] { + t.Fatalf("got: %v, want: %v", peer3.Addr.String(), peerAtIndex[2]) + } +} diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index ea8d11237f80..73a7c5ad56e8 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -1,6 +1,6 @@ /* * - * Copyright 2022 gRPC authors. + * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,31 +30,47 @@ import ( "google.golang.org/grpc/serviceconfig" ) +// Global to stub out in tests. +var grpcranduint32 = grpcrand.Uint32 + // Name is the name of the least request balancer. const Name = "least_request_experimental" var logger = grpclog.Component("least request") +func init() { + balancer.Register(bb{}) +} + +// LBConfig is the balancer config for least_request_experimental balancer. +type LBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + + // ChoiceCount is the number of random SubConns to sample to try and find + // the one with the Least Request. If unset, defaults to 2. If set to < 2, + // will become 2, and if set to > 10, will become 10. + ChoiceCount uint32 `json:"choiceCount,omitempty"` +} + type bb struct{} func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - lbConfig := &leastRequestConfig{ + lbConfig := &LBConfig{ ChoiceCount: 2, } if err := json.Unmarshal(s, lbConfig); err != nil { return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err) } + // "If `choice_count < 2`, the config will be rejected." - A48 + if lbConfig.ChoiceCount < 2 { // sweet + return nil, fmt.Errorf("least-request: lbConfig.choiceCount: %v, must be >= 2", lbConfig.ChoiceCount) + } // "If a LeastRequestLoadBalancingConfig with a choice_count > 10 is // received, the least_request_experimental policy will set choice_count = - // 10." + // 10." - A48 if lbConfig.ChoiceCount > 10 { lbConfig.ChoiceCount = 10 } - // I asked about this in chat but what happens if choiceCount < 2 (0 or 1)? - // Doing this for now. - if lbConfig.ChoiceCount < 2 { - lbConfig.ChoiceCount = 2 - } return lbConfig, nil } @@ -63,7 +79,7 @@ func (bb) Name() string { } func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { - b := &leastRequestBalancer{} + b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)} baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true}) baseBalancer := baseBuilder.Build(cc, bOpts) b.Balancer = baseBalancer @@ -76,19 +92,11 @@ type leastRequestBalancer struct { balancer.Balancer choiceCount uint32 -} - -type leastRequestConfig struct { - serviceconfig.LoadBalancingConfig `json:"-"` - - // ChoiceCount is the number of random SubConns to sample to try and find - // the one with the Least Request. If unset, defaults to 2. If set to < 2, - // will become 2, and if set to > 10, will become 10. - ChoiceCount uint32 `json:"choiceCount,omitempty"` + scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates. } func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - lrCfg, ok := s.BalancerConfig.(*leastRequestConfig) + lrCfg, ok := s.BalancerConfig.(*LBConfig) if !ok { logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) return balancer.ErrBadResolverState @@ -100,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat type scWithRPCCount struct { sc balancer.SubConn - numRPCs int32 + numRPCs *int32 } func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker { @@ -108,10 +116,26 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } + + for sc := range lrb.scRPCCounts { + if _, ok := info.ReadySCs[sc]; !ok { // If no longer ready, no more need for the ref to count active RPCs. + delete(lrb.scRPCCounts, sc) + } + } + + // Create new refs if needed. + for sc := range info.ReadySCs { + if _, ok := lrb.scRPCCounts[sc]; !ok { + lrb.scRPCCounts[sc] = new(int32) + } + } + + // Copy refs to counters into picker. scs := make([]scWithRPCCount, 0, len(info.ReadySCs)) for sc := range info.ReadySCs { scs = append(scs, scWithRPCCount{ - sc: sc, + sc: sc, + numRPCs: lrb.scRPCCounts[sc], // guaranteed to be present due to algorithm }) } @@ -132,27 +156,25 @@ type picker struct { func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { var pickedSC *scWithRPCCount for i := 0; i < int(p.choiceCount); i++ { - index := grpcrand.Uint32() % uint32(len(p.subConns)) + index := grpcranduint32() % uint32(len(p.subConns)) sc := p.subConns[index] if pickedSC == nil { pickedSC = &sc continue } - if sc.numRPCs < pickedSC.numRPCs { + if *sc.numRPCs < *pickedSC.numRPCs { pickedSC = &sc } } // "The counter for a subchannel should be atomically incremented by one // after it has been successfully picked by the picker." - A48 - atomic.AddInt32(&pickedSC.numRPCs, 1) - + atomic.AddInt32(pickedSC.numRPCs, 1) // "the picker should add a callback for atomically decrementing the // subchannel counter once the RPC finishes (regardless of Status code)." - // A48. done := func(balancer.DoneInfo) { - atomic.AddInt32(&pickedSC.numRPCs, -1) + atomic.AddInt32(pickedSC.numRPCs, -1) } - return balancer.PickResult{ SubConn: pickedSC.sc, Done: done, From 665e157713dc1a8debb82a3f28f12eb90ee171dc Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 10 Aug 2023 14:26:42 -0400 Subject: [PATCH 3/9] Add persisting e2e test --- balancer/leastrequest/balancer_test.go | 124 ++++++++++++++++++++++++- balancer/leastrequest/leastrequest.go | 1 + 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index f79cf8bbef91..f4c8fa9700a4 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -117,8 +117,8 @@ func (s) TestParseConfig(t *testing.T) { } } -// setupBackends spins up four test backends, each listening on a port on -// localhost. The four backends always reply with an empty response with no +// setupBackends spins up three test backends, each listening on a port on +// localhost. The three backends always reply with an empty response with no // error, and for streaming receive until hitting an EOF error. func setupBackends(t *testing.T) ([]string, func()) { t.Helper() @@ -338,3 +338,123 @@ func (s) TestLeastRequestE2E(t *testing.T) { t.Fatalf("got: %v, want: %v", peer3.Addr.String(), peerAtIndex[2]) } } + +// TestLeastRequestPersistsCounts tests that the Least Request Balancer persists +// counts once it gets a new picker update. It first updates the Least Request +// Balancer with two backends, and creates a bunch of streams on them. Then, it +// updates the Least Request Balancer with three backends, including the two +// previous. Any created streams should then be started on the new backend. +func (s) TestLeastRequestPersistsCounts(t *testing.T) { + defer func(u func() uint32) { + grpcranduint32 = u + }(grpcranduint32) + var index int + indexes := []uint32{ + 0, 1, 2, 3, 4, 5, // Triggers a round robin distribution of indexes for two addresses or three addresses. + } + grpcranduint32 = func() uint32 { + ret := indexes[index%len(indexes)] + index++ + return ret + } + addresses, cancel := setupBackends(t) + defer cancel() + + mr := manual.NewBuilderWithScheme("lr-e2e") + defer mr.Close() + + // Configure least request as top level balancer of channel. + lrscJSON := ` +{ + "loadBalancingConfig": [ + { + "least_request_experimental": { + "choiceCount": 2 + } + } + ] +}` + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON) + firstTwoAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + } + mr.InitialState(resolver.State{ + Addresses: firstTwoAddresses, + ServiceConfig: sc, + }) + + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + // Start 50 streaming RPCs, and leave them unfinished for the duration of + // the test. This will populate the first two addresses with many active + // RPCs. + for i := 0; i < 50; i++ { + stream, err := testServiceClient.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + defer func() { + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + }() + } + + // Update the least request balancer to choice count 3. Also update the + // address list adding a third address. Alongside the injected randomness, + // this should trigger the least request balancer to search all created + // SubConns. Thus, since address 3 is the new address and the first two + // addresses are populated with RPCs, once the picker update of all 3 READY + // SubConns takes effect, all new streams should be started on address 3. + lrscJSON = ` +{ + "loadBalancingConfig": [ + { + "least_request_experimental": { + "choiceCount": 3 + } + } + ] +}` + sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON) + fullAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + {Addr: addresses[2]}, + } + mr.UpdateState(resolver.State{ + Addresses: fullAddresses, + ServiceConfig: sc, + }) + newAddress := fullAddresses[2] + // Poll for only address 3 to show up. This requires a polling loop because + // picker update with all three SubConns doesn't take into effect + // immediately, needs the third SubConn to become READY. + if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + for i := 0; i < 50; i++ { + var peer peer.Peer + stream, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer)) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + stream.CloseSend() // Finish stream to populate peer. + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + if peer.Addr.String() != addresses[2] { + t.Fatalf("got: %v, want: %v", peer.Addr.String(), addresses[2]) + } + } +} diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index 73a7c5ad56e8..7f89be090158 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -16,6 +16,7 @@ * */ +// Package leastrequest implements a least request load balancer. package leastrequest import ( From 7aa9a1711bcf10c90b1c69aebb53b799b4567b88 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 10 Aug 2023 14:57:15 -0400 Subject: [PATCH 4/9] Deflake e2e test --- balancer/leastrequest/balancer_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index f4c8fa9700a4..f3757590df86 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -393,6 +393,14 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { defer cancel() testServiceClient := testgrpc.NewTestServiceClient(cc) + // Wait for the two backends to round robin across. The happens because a + // SubConn transitioning into READY causes a new picker update. Once the + // picker update with the two backends is present, this test can start to + // populate those backends with streams. + if err := checkRoundRobinRPCs(ctx, testServiceClient, firstTwoAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + // Start 50 streaming RPCs, and leave them unfinished for the duration of // the test. This will populate the first two addresses with many active // RPCs. From 0ce88c93f8f1c74c14059b5f4a293ba07e227b8b Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 10 Aug 2023 15:34:14 -0400 Subject: [PATCH 5/9] Review comments and deflake e2e test again --- balancer/leastrequest/balancer_test.go | 5 ++++- balancer/leastrequest/leastrequest.go | 11 +++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index f3757590df86..8c72c81f7877 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -350,7 +350,7 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { }(grpcranduint32) var index int indexes := []uint32{ - 0, 1, 2, 3, 4, 5, // Triggers a round robin distribution of indexes for two addresses or three addresses. + 0, 0, 1, 1, } grpcranduint32 = func() uint32 { ret := indexes[index%len(indexes)] @@ -423,6 +423,9 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { // SubConns. Thus, since address 3 is the new address and the first two // addresses are populated with RPCs, once the picker update of all 3 READY // SubConns takes effect, all new streams should be started on address 3. + indexes = []uint32{ + 0, 1, 2, 3, 4, 5, + } lrscJSON = ` { "loadBalancingConfig": [ diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index 7f89be090158..87a74c7ead72 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -37,7 +37,7 @@ var grpcranduint32 = grpcrand.Uint32 // Name is the name of the least request balancer. const Name = "least_request_experimental" -var logger = grpclog.Component("least request") +var logger = grpclog.Component("least-request") func init() { balancer.Register(bb{}) @@ -47,9 +47,9 @@ func init() { type LBConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` - // ChoiceCount is the number of random SubConns to sample to try and find - // the one with the Least Request. If unset, defaults to 2. If set to < 2, - // will become 2, and if set to > 10, will become 10. + // ChoiceCount is the number of random SubConns to sample to find the one + // with the fewest outstanding requests. If unset, defaults to 2. If set to + // < 2, the config will be rejected, and if set to > 10, will become 10. ChoiceCount uint32 `json:"choiceCount,omitempty"` } @@ -82,8 +82,7 @@ func (bb) Name() string { func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)} baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true}) - baseBalancer := baseBuilder.Build(cc, bOpts) - b.Balancer = baseBalancer + b.Balancer = baseBuilder.Build(cc, bOpts) return b } From 6038266cf30d5408180522b0a7576c57b01d46b3 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 11 Aug 2023 15:51:42 -0400 Subject: [PATCH 6/9] Responded to Doug's comments --- balancer/leastrequest/balancer_test.go | 146 ++++++++++++------------- 1 file changed, 73 insertions(+), 73 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index 8c72c81f7877..7e8ea47d6d46 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -120,13 +120,12 @@ func (s) TestParseConfig(t *testing.T) { // setupBackends spins up three test backends, each listening on a port on // localhost. The three backends always reply with an empty response with no // error, and for streaming receive until hitting an EOF error. -func setupBackends(t *testing.T) ([]string, func()) { +func setupBackends(t *testing.T) []string { t.Helper() - - backends := make([]*stubserver.StubServer, 4) - addresses := make([]string, 4) - // Construct and start 4 working backends. - for i := 0; i < 4; i++ { + const numBackends = 3 + addresses := make([]string, numBackends) + // Construct and start three working backends. + for i := 0; i < numBackends; i++ { backend := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil @@ -144,15 +143,10 @@ func setupBackends(t *testing.T) ([]string, func()) { t.Fatalf("Failed to start backend: %v", err) } t.Logf("Started good TestService backend at: %q", backend.Address) - backends[i] = backend + t.Cleanup(func() { backend.Stop() }) addresses[i] = backend.Address } - cancel := func() { - for _, backend := range backends { - backend.Stop() - } - } - return addresses, cancel + return addresses } // checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn, @@ -217,8 +211,7 @@ func (s) TestLeastRequestE2E(t *testing.T) { index++ return ret } - addresses, cancel := setupBackends(t) - defer cancel() + addresses := setupBackends(t) mr := manual.NewBuilderWithScheme("lr-e2e") defer mr.Close() @@ -287,55 +280,38 @@ func (s) TestLeastRequestE2E(t *testing.T) { index = 0 indexes = []uint32{ 0, 0, // Causes first stream to be on first address. - 0, 1, // Compares first address (which already has a RPC) to second, so choose second. - 1, 2, // Compares second address (which already has a RPC) to third, so choose third. - } - // Start a streaming call on first, but don't finish the stream. - var peer1 peer.Peer - stream1, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer1)) - if err != nil { - t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) - } - - // Start a second streaming call. From the indexes injected into random - // number generator, this should compare Address 1, which already has a RPC - // to Address 2, so thus should start the stream on Address 2. - var peer2 peer.Peer - stream2, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer2)) - if err != nil { - t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) - } + 0, 1, // Compares first address (one RPC) to second (no RPCs), so choose second. + 1, 2, // Compares second address (one RPC) to third (no RPCs), so choose third. + 0, 3, // Causes another stream on first address. + 1, 0, // Compares second address (one RPC) to first (two RPCs), so choose second. + 2, 0, // Compares third address (one RPC) to first (two RPCs), so choose third. + 0, 0, // Causes another stream on first address. + 2, 2, // Causes a stream on third address. + 2, 1, // Compares third address (three RPCs) to second (two RPCs), so choose third. + } + wantIndex := []uint32{0, 1, 2, 0, 1, 2, 0, 2, 1} - // Start a third streaming call. From the indexes injected into random - // number generator, this should compare Address 2, which already has a RPC - // to Address 3, so thus should start the stream on Address 3. - var peer3 peer.Peer - stream3, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer3)) - if err != nil { - t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) - } - - // Finish the streams to collect peer information. - stream1.CloseSend() - if _, err = stream1.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - stream2.CloseSend() - if _, err = stream2.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - stream3.CloseSend() - if _, err = stream3.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - if peer1.Addr.String() != peerAtIndex[0] { - t.Fatalf("got: %v, want: %v", peer1.Addr.String(), peerAtIndex[0]) - } - if peer2.Addr.String() != peerAtIndex[1] { - t.Fatalf("got: %v, want: %v", peer2.Addr.String(), peerAtIndex[1]) - } - if peer3.Addr.String() != peerAtIndex[2] { - t.Fatalf("got: %v, want: %v", peer3.Addr.String(), peerAtIndex[2]) + // Start streaming RPC's, but do not finish them. Each created stream should + // be started based on the least request algorithm and injected randomness + // (see indexes slice above for exact expectations). + for _, wantIndex := range wantIndex { + stream, err := testServiceClient.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + defer func() { + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + }() + p, ok := peer.FromContext(stream.Context()) + if !ok { + t.Fatalf("testServiceClient.FullDuplexCall has no Peer") + } + if p.Addr.String() != peerAtIndex[wantIndex] { + t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), peerAtIndex[wantIndex]) + } } } @@ -357,8 +333,7 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { index++ return ret } - addresses, cancel := setupBackends(t) - defer cancel() + addresses := setupBackends(t) mr := manual.NewBuilderWithScheme("lr-e2e") defer mr.Close() @@ -423,6 +398,7 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { // SubConns. Thus, since address 3 is the new address and the first two // addresses are populated with RPCs, once the picker update of all 3 READY // SubConns takes effect, all new streams should be started on address 3. + index = 0 indexes = []uint32{ 0, 1, 2, 3, 4, 5, } @@ -454,18 +430,42 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } - for i := 0; i < 50; i++ { - var peer peer.Peer - stream, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer)) + // Start 25 rpcs, but don't finish them. They should all start on address 3, + // since the first two addresses both have 25 RPCs (and randomness + // injection/choiceCount causes all 3 to be compared every iteration). + for i := 0; i < 25; i++ { + stream, err := testServiceClient.FullDuplexCall(ctx) if err != nil { t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) } - stream.CloseSend() // Finish stream to populate peer. - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + defer func() { + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + }() + p, ok := peer.FromContext(stream.Context()) + if !ok { + t.Fatalf("testServiceClient.FullDuplexCall has no Peer") } - if peer.Addr.String() != addresses[2] { - t.Fatalf("got: %v, want: %v", peer.Addr.String(), addresses[2]) + if p.Addr.String() != addresses[2] { + t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), addresses[2]) } } + + // The next two unary RPC should be created with the first index in the + // slice (created from non deterministic map iterations), since in the case + // of equal numbers of RPC's, the least request picker picks the first one. + // Thus, make sure the backend is equal. + var p1 peer.Peer + if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p1)); err != nil { + t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + } + var p2 peer.Peer + if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p2)); err != nil { + t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + } + if p1.Addr.String() != p2.Addr.String() { + t.Fatalf("Peer1: %v != Peer2: %v", p1.Addr.String(), p2.Addr.String()) + } } From 7dc96af4b439b5e4c4e03a6e9b3997b5c1419d84 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 11 Aug 2023 16:10:57 -0400 Subject: [PATCH 7/9] Add round robin assertion at the end of an e2e test --- balancer/leastrequest/balancer_test.go | 41 ++++++++++++++++++-------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index 7e8ea47d6d46..8c75621ee74e 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -453,19 +453,36 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { } } - // The next two unary RPC should be created with the first index in the - // slice (created from non deterministic map iterations), since in the case - // of equal numbers of RPC's, the least request picker picks the first one. - // Thus, make sure the backend is equal. - var p1 peer.Peer - if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p1)); err != nil { - t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + // Now 25 RPC's are active on each address, the next three RPC's should + // round robin, since choiceCount is three and the injected random indexes + // cause it to search all three addresses for fewest outstanding requests on + // each iteration. + wantAddrCount := map[string]int{ + addresses[0]: 1, + addresses[1]: 1, + addresses[2]: 1, } - var p2 peer.Peer - if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p2)); err != nil { - t.Fatalf("testServiceClient.EmptyCall failed: %v", err) + gotAddrCount := make(map[string]int) + for i := 0; i < len(addresses); i++ { + stream, err := testServiceClient.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) + } + defer func() { + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + }() + p, ok := peer.FromContext(stream.Context()) + if !ok { + t.Fatalf("testServiceClient.FullDuplexCall has no Peer") + } + if p.Addr != nil { + gotAddrCount[p.Addr.String()]++ + } } - if p1.Addr.String() != p2.Addr.String() { - t.Fatalf("Peer1: %v != Peer2: %v", p1.Addr.String(), p2.Addr.String()) + if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { + t.Fatalf("addr count got: %v, want (round robin): %v", gotAddrCount, wantAddrCount) } } From 377dbac2116c7fa638e5703b283f87bb3fdc8f77 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 11 Aug 2023 17:36:51 -0400 Subject: [PATCH 8/9] Responded to Doug's comments --- balancer/leastrequest/balancer_test.go | 35 +++----------------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index 8c75621ee74e..c1cfb8490fd5 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "io" "strings" "testing" "time" @@ -131,12 +130,8 @@ func setupBackends(t *testing.T) []string { return &testpb.Empty{}, nil }, FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { - for { - _, err := stream.Recv() - if err == io.EOF { - return nil - } - } + <-stream.Context().Done() + return nil }, } if err := backend.StartServer(); err != nil { @@ -299,12 +294,6 @@ func (s) TestLeastRequestE2E(t *testing.T) { if err != nil { t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) } - defer func() { - stream.CloseSend() - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - }() p, ok := peer.FromContext(stream.Context()) if !ok { t.Fatalf("testServiceClient.FullDuplexCall has no Peer") @@ -380,16 +369,10 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { // the test. This will populate the first two addresses with many active // RPCs. for i := 0; i < 50; i++ { - stream, err := testServiceClient.FullDuplexCall(ctx) + _, err := testServiceClient.FullDuplexCall(ctx) if err != nil { t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) } - defer func() { - stream.CloseSend() - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - }() } // Update the least request balancer to choice count 3. Also update the @@ -438,12 +421,6 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { if err != nil { t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) } - defer func() { - stream.CloseSend() - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - }() p, ok := peer.FromContext(stream.Context()) if !ok { t.Fatalf("testServiceClient.FullDuplexCall has no Peer") @@ -468,12 +445,6 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { if err != nil { t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err) } - defer func() { - stream.CloseSend() - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - }() p, ok := peer.FromContext(stream.Context()) if !ok { t.Fatalf("testServiceClient.FullDuplexCall has no Peer") From abe34875e3f653af44975d42e0af5b6ab8a634d7 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 11 Aug 2023 18:09:48 -0400 Subject: [PATCH 9/9] Responded to Doug's comments --- balancer/leastrequest/balancer_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index c1cfb8490fd5..39bf1b94abdd 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -165,9 +165,7 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, for c := 0; c < len(addrs); c++ { var peer peer.Peer client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) - if peer.Addr != nil { - iteration[c] = peer.Addr.String() - } + iteration[c] = peer.Addr.String() } iterations = append(iterations, iteration) } @@ -175,7 +173,7 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, for _, addr := range iterations[0] { gotAddrCount[addr]++ } - if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { + if !cmp.Equal(gotAddrCount, wantAddrCount) { continue } // Ensure all three iterations contain the same addresses. @@ -454,6 +452,6 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { } } if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { - t.Fatalf("addr count got: %v, want (round robin): %v", gotAddrCount, wantAddrCount) + t.Fatalf("addr count (-got:, +want): %v", diff) } }