Skip to content

Commit

Permalink
rls: delegate pick to child policy as long as it is not in TransientF…
Browse files Browse the repository at this point in the history
…ailure
  • Loading branch information
easwars committed Sep 14, 2022
1 parent 7da8a05 commit 3907d39
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 29 deletions.
36 changes: 8 additions & 28 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,17 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

// delegateToChildPolicies is a helper function which iterates through the list
// of child policy wrappers in a cache entry and attempts to find a child policy
// to which this RPC can be routed to. If there is no child policy in READY
// state, we delegate to the first child policy arbitrarily.
// to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the first child policy arbitrarily.
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
for _, cpw := range dcEntry.childPolicyWrappers {
ok, res, err := p.pickIfFeasible(cpw, info)
if ok {
return res, err
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
if state.ConnectivityState == connectivity.TransientFailure {
continue
}
return state.Picker.Pick(info)
}
if len(dcEntry.childPolicyWrappers) != 0 {
state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state))
Expand Down Expand Up @@ -249,8 +250,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
_, res, err := p.pickIfFeasible(p.defaultPolicy, info)
return res, err
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
}
return balancer.PickResult{}, errOnNoDefault
}
Expand All @@ -275,27 +276,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState,
return throttled
}

// pickIfFeasible determines if a pick can be delegated to child policy based on
// its connectivity state.
// - If state is CONNECTING, the pick is to be queued
// - If state is IDLE, the child policy is instructed to exit idle, and the pick
// is to be queued
// - If state is READY, pick it delegated to the child policy's picker
func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Connecting:
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Idle:
p.bg.ExitIdleOne(cpw.target)
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Ready:
r, e := state.Picker.Pick(info)
return true, r, e
}
return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
Expand Down
11 changes: 10 additions & 1 deletion internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/testutils"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
Expand Down Expand Up @@ -333,6 +334,14 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
}
}

// A simple hack to ensure that our e2e tests run with both ring_hash and
// weighted_target LB policies. This will not ensure an equal split between
// the two LB polciies, but will most definitely end up using both of them.
lbPolicy := v3clusterpb.Cluster_ROUND_ROBIN
if grpcrand.Intn(100) < 50 {
lbPolicy = v3clusterpb.Cluster_RING_HASH
}

cluster := &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
Expand All @@ -344,7 +353,7 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
},
ServiceName: edsServiceName,
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
LbPolicy: lbPolicy,
}
if tlsContext != nil {
cluster.TransportSocket = &v3corepb.TransportSocket{
Expand Down

0 comments on commit 3907d39

Please sign in to comment.