Skip to content

Commit

Permalink
balancer/weightedroundrobin: Add recording point for endpoint weight …
Browse files Browse the repository at this point in the history
…not yet usable and add metrics tests (#7466)
  • Loading branch information
zasweq authored Aug 10, 2024
1 parent 7b9e012 commit 54b48f7
Show file tree
Hide file tree
Showing 25 changed files with 691 additions and 80 deletions.
26 changes: 18 additions & 8 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (p *picker) inc() uint32 {
}

func (p *picker) regenerateScheduler() {
s := p.newScheduler()
s := p.newScheduler(true)
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
}

Expand Down Expand Up @@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
w.SubConn.Connect()
case connectivity.Ready:
// If we transition back to READY state, reset nonEmptySince so that we
// apply the blackout period after we start receiving load data. Note
// that we cannot guarantee that we will never receive lingering
// callbacks for backend metric reports from the previous connection
// after the new connection has been established, but they should be
// masked by new backend metric reports from the new connection by the
// time the blackout period ends.
// apply the blackout period after we start receiving load data. Also
// reset lastUpdated to trigger endpoint weight not yet usable in the
// case endpoint gets asked what weight it is before receiving a new
// load report. Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
w.mu.Lock()
w.nonEmptySince = time.Time{}
w.lastUpdated = time.Time{}
w.mu.Unlock()
case connectivity.Shutdown:
if w.stopORCAListener != nil {
Expand All @@ -592,7 +595,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the weights of the
// other backends. If forScheduler is set to true, this function will emit
// metrics through the mtrics registry.
// metrics through the metrics registry.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout
}()
}

// The SubConn has not received a load report (i.e. just turned READY with
// no load report).
if w.lastUpdated == (time.Time{}) {
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
return 0
}

// If the most recent update was longer ago than the expiration period,
// reset nonEmptySince so that we apply the blackout period again if we
// start getting data again in the future, and return 0.
Expand Down
46 changes: 46 additions & 0 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -81,6 +82,14 @@ var (
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
testMetricsConfig = iwrr.LBConfig{
EnableOOBLoadReport: boolp(false),
OOBReportingPeriod: stringp("0.005s"),
BlackoutPeriod: stringp("0s"),
WeightExpirationPeriod: stringp("60s"),
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
)

type testServer struct {
Expand Down Expand Up @@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) {
}
}

// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It
// configures a weighted round robin balancer as the top level balancer of a
// ClientConn, and configures a fake stats handler on the ClientConn to receive
// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on
// balancer startup case which triggers the first picker and scheduler update
// before any load reports are received.
//
// Note that this test and others, metrics emission asssertions are a snapshot
// of the most recently emitted metrics. This is due to the nondeterminism of
// scheduler updates with respect to test bodies, so the assertions made are
// from the most recently synced state of the system (picker/scheduler) from the
// test body.
func (s) TestWRRMetricsBasic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder(t)
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))

if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
// RR).
func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
Expand Down
163 changes: 163 additions & 0 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package weightedroundrobin

import (
"testing"
"time"

"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestWRR_Metrics_SubConnWeight tests different scenarios for the weight call
// on a weighted SubConn, and expects certain metrics for each of these
// scenarios.
func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
tests := []struct {
name string
weightExpirationPeriod time.Duration
blackoutPeriod time.Duration
lastUpdated time.Time
nonEmpty time.Time
nowTime time.Time
endpointWeightStaleWant float64
endpointWeightNotYetUsableWant float64
endpointWeightWant float64
}{
// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
{
name: "no weight set",
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "weight expiration",
lastUpdated: time.Now(),
weightExpirationPeriod: 2 * time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(100 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
{
name: "in blackout period",
lastUpdated: time.Now(),
weightExpirationPeriod: time.Minute,
blackoutPeriod: 10 * time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "normal weight",
lastUpdated: time.Now(),
nonEmpty: time.Now(),
weightExpirationPeriod: time.Minute,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 3,
},
{
name: "weight expiration takes precdedence over blackout",
lastUpdated: time.Now(),
nonEmpty: time.Now(),
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Minute,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
lastUpdated: test.lastUpdated,
nonEmptySince: test.nonEmpty,
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
})
}

}

// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric
// for scheduler updates. It tests the case with one SubConn, and two SubConns
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 0,
}

p := &picker{
cfg: &lbConfig{
BlackoutPeriod: iserviceconfig.Duration(10 * time.Second),
WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute),
},
subConns: []*weightedSubConn{wsc},
metricsRecorder: tmr,
}
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
tmr.ClearMetrics()

// With two SubConns, if neither of them have weights, it will also fallback
// to round robin.
wsc2 := &weightedSubConn{
target: "target",
metricsRecorder: tmr,
weightVal: 0,
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
}
12 changes: 8 additions & 4 deletions balancer/weightedroundrobin/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type scheduler interface {
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
// will return an Earliest Deadline First (EDF) scheduler implementation that
// selects the subchannels according to their weights.
func (p *picker) newScheduler() scheduler {
scWeights := p.scWeights(true)
func (p *picker) newScheduler(recordMetrics bool) scheduler {
scWeights := p.scWeights(recordMetrics)
n := len(scWeights)
if n == 0 {
return nil
}
if n == 1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: 1, inc: p.inc}
}
sum := float64(0)
Expand All @@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler {
}

if numZero >= n-1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
unscaledMean := sum / float64(n-numZero)
Expand Down
4 changes: 2 additions & 2 deletions internal/stats/metrics_recorder_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (s) TestMetricsRecorderList(t *testing.T) {

// Create two stats.Handlers which also implement MetricsRecorder, configure
// one as a global dial option and one as a local dial option.
mr1 := stats.NewTestMetricsRecorder(t, []string{})
mr2 := stats.NewTestMetricsRecorder(t, []string{})
mr1 := stats.NewTestMetricsRecorder(t)
mr2 := stats.NewTestMetricsRecorder(t)

defer internal.ClearGlobalDialOptions()
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1))
Expand Down
Loading

0 comments on commit 54b48f7

Please sign in to comment.