From 23ca143088b146675ca7cc86a6a31e5a0c0a100b Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 31 Aug 2023 19:06:54 -0400 Subject: [PATCH 1/3] Cache atomic load and also add concurrent rpc test --- balancer/leastrequest/balancer_test.go | 53 ++++++++++++++++++++++++++ balancer/leastrequest/leastrequest.go | 5 ++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index 39bf1b94abdd..0ef59c2c22a4 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "testing" "time" @@ -455,3 +456,55 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) { t.Fatalf("addr count (-got:, +want): %v", diff) } } + +// TestConcurrentRPCs tests concurrent RPCs on the least request balancer. It +// configures a channel with a least request balancer as the top level balancer, +// and makes 100 RPCs asynchronously. This makes sure no race conditions happen +// in this scenario. +func (s) TestConcurrentRPCs(t *testing.T) { + addresses := setupBackends(t) + + mr := manual.NewBuilderWithScheme("lr-e2e") + defer mr.Close() + + // Configure least request as top level balancer of channel. + lrscJSON := ` +{ + "loadBalancingConfig": [ + { + "least_request_experimental": { + "choiceCount": 2 + } + } + ] +}` + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON) + firstTwoAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + } + mr.InitialState(resolver.State{ + Addresses: firstTwoAddresses, + ServiceConfig: sc, + }) + + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + }() + } + wg.Wait() + +} diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index 6ef86dc267ed..2f197d2d5c4a 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -155,15 +155,18 @@ type picker struct { func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { var pickedSC *scWithRPCCount + var pickedSCNumRPCs int32 for i := 0; i < int(p.choiceCount); i++ { index := grpcranduint32() % uint32(len(p.subConns)) sc := p.subConns[index] if pickedSC == nil { pickedSC = &sc + pickedSCNumRPCs = pickedSC.numRPCs.Load() continue } - if sc.numRPCs.Load() < pickedSC.numRPCs.Load() { + if sc.numRPCs.Load() < pickedSCNumRPCs { pickedSC = &sc + pickedSCNumRPCs = pickedSC.numRPCs.Load() } } // "The counter for a subchannel should be atomically incremented by one From 761cd35d51cf34041cc6f67d39a837a49623d1d0 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 1 Sep 2023 14:52:43 -0400 Subject: [PATCH 2/3] Review comments --- balancer/leastrequest/leastrequest.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index 2f197d2d5c4a..6f3318b47afc 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -164,9 +164,10 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { pickedSCNumRPCs = pickedSC.numRPCs.Load() continue } - if sc.numRPCs.Load() < pickedSCNumRPCs { + scNumRPCs := sc.numRPCs.Load() + if scNumRPCs < pickedSCNumRPCs { pickedSC = &sc - pickedSCNumRPCs = pickedSC.numRPCs.Load() + pickedSCNumRPCs = scNumRPCs } } // "The counter for a subchannel should be atomically incremented by one From 6e5d9f56672d0a937527dd6b36b684dc5c80b012 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 5 Sep 2023 14:12:56 -0400 Subject: [PATCH 3/3] Responded to Doug's comments --- balancer/leastrequest/balancer_test.go | 4 +++- balancer/leastrequest/leastrequest.go | 11 +++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/balancer_test.go index 0ef59c2c22a4..44bb21c9e9ff 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/balancer_test.go @@ -502,7 +502,9 @@ func (s) TestConcurrentRPCs(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + for j := 0; j < 5; j++ { + testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + } }() } wg.Wait() diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index 6f3318b47afc..3289f2869f88 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -159,15 +159,10 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { for i := 0; i < int(p.choiceCount); i++ { index := grpcranduint32() % uint32(len(p.subConns)) sc := p.subConns[index] - if pickedSC == nil { + n := sc.numRPCs.Load() + if pickedSC == nil || n < pickedSCNumRPCs { pickedSC = &sc - pickedSCNumRPCs = pickedSC.numRPCs.Load() - continue - } - scNumRPCs := sc.numRPCs.Load() - if scNumRPCs < pickedSCNumRPCs { - pickedSC = &sc - pickedSCNumRPCs = scNumRPCs + pickedSCNumRPCs = n } } // "The counter for a subchannel should be atomically incremented by one