Skip to content

Commit

Permalink
multiple/test: use stub balancer instead of defining wrapped balancers (
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Aug 9, 2023
1 parent 92b481a commit 67a8e73
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 424 deletions.
191 changes: 71 additions & 120 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -838,118 +839,31 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

const wrappingTopLevelBalancerName = "wrapping-top-level-balancer"
const multipleUpdateStateChildBalancerName = "multiple-update-state-child-balancer"

type wrappingTopLevelBalancerBuilder struct {
balCh chan balancer.Balancer
}

func (w *wrappingTopLevelBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
tlb := &wrappingTopLevelBalancer{ClientConn: cc}
tlb.Balancer = balancer.Get(Name).Build(tlb, balancer.BuildOptions{})
w.balCh <- tlb
return tlb
}

func (w *wrappingTopLevelBalancerBuilder) Name() string {
return wrappingTopLevelBalancerName
}

func (w *wrappingTopLevelBalancerBuilder) ParseConfig(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
parser := balancer.Get(Name).(balancer.ConfigParser)
return parser.ParseConfig(sc)
}

// wrappingTopLevelBalancer acts as the top-level LB policy on the channel and
// wraps an RLS LB policy. It forwards all balancer API calls unmodified to the
// underlying RLS LB policy. It overrides the UpdateState method on the
// balancer.ClientConn passed to the RLS LB policy and stores all state updates
// pushed by the latter.
type wrappingTopLevelBalancer struct {
// testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and
// stores all state updates pushed by the RLS LB policy.
type testCCWrapper struct {
balancer.ClientConn
balancer.Balancer

mu sync.Mutex
states []balancer.State
}

func (w *wrappingTopLevelBalancer) UpdateState(bs balancer.State) {
w.mu.Lock()
w.states = append(w.states, bs)
w.mu.Unlock()
w.ClientConn.UpdateState(bs)
func (t *testCCWrapper) UpdateState(bs balancer.State) {
t.mu.Lock()
t.states = append(t.states, bs)
t.mu.Unlock()
t.ClientConn.UpdateState(bs)
}

func (w *wrappingTopLevelBalancer) getStates() []balancer.State {
w.mu.Lock()
defer w.mu.Unlock()
func (t *testCCWrapper) getStates() []balancer.State {
t.mu.Lock()
defer t.mu.Unlock()

states := make([]balancer.State, len(w.states))
copy(states, w.states)
states := make([]balancer.State, len(t.states))
copy(states, t.states)
return states
}

// wrappedPickFirstBalancerBuilder builds a balancer which wraps a pickfirst
// balancer. The wrapping balancing receives addresses to be passed to the
// underlying pickfirst balancer as part of its configuration.
type wrappedPickFirstBalancerBuilder struct{}

func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (wrappedPickFirstBalancerBuilder) Name() string {
return multipleUpdateStateChildBalancerName
}

type WrappedPickFirstBalancerConfig struct {
serviceconfig.LoadBalancingConfig
Backend string // The target for which this child policy was created.
}

func (wbb *wrappedPickFirstBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &WrappedPickFirstBalancerConfig{}
if err := json.Unmarshal(c, cfg); err != nil {
return nil, err
}
return cfg, nil
}

// wrappedPickFirstBalancer wraps a pickfirst balancer and makes multiple calls
// to UpdateState when handling a config update in UpdateClientConnState. When
// this policy is used as a child policy of the RLS LB policy, it is expected
// that the latter suppress these updates and push a single picker update on the
// channel (after the config has been processed by all child policies).
type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

func (wb *wrappedPickFirstBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
wb.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}})
wb.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}})

cfg := ccs.BalancerConfig.(*WrappedPickFirstBalancerConfig)
return wb.Balancer.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{{Addr: cfg.Backend}}},
})
}

func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}

// TestUpdateStatePauses tests the scenario where a config update received by
// the RLS LB policy results in multiple UpdateState calls from the child
// policies. This test verifies that picker updates are paused when the config
Expand All @@ -972,8 +886,59 @@ func (s) TestUpdateStatePauses(t *testing.T) {
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

// Register the top-level wrapping balancer which forwards calls to RLS.
bb := &wrappingTopLevelBalancerBuilder{balCh: make(chan balancer.Balancer, 1)}
balancer.Register(bb)
topLevelBalancerName := t.Name() + "top-level"
var ccWrapper *testCCWrapper
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
parser := balancer.Get(Name).(balancer.ConfigParser)
return parser.ParseConfig(sc)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

// Register a child policy that wraps a pickfirst balancer and makes multiple calls
// to UpdateState when handling a config update in UpdateClientConnState. When
// this policy is used as a child policy of the RLS LB policy, it is expected
// that the latter suppress these updates and push a single picker update on the
// channel (after the config has been processed by all child policies).
childPolicyName := t.Name() + "child"
type childPolicyConfig struct {
serviceconfig.LoadBalancingConfig
Backend string // `json:"backend,omitempty"`
}
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &childPolicyConfig{}
if err := json.Unmarshal(sc, cfg); err != nil {
return nil, err
}
return cfg, nil
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}})
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}})

cfg := ccs.BalancerConfig.(*childPolicyConfig)
return bal.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{{Addr: cfg.Backend}}},
})
},
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
Expand All @@ -985,10 +950,6 @@ func (s) TestUpdateStatePauses(t *testing.T) {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
})

// Register a child policy which wraps a pickfirst balancer and receives the
// backend address as part of its configuration.
balancer.Register(&wrappedPickFirstBalancerBuilder{})

// Register a manual resolver and push the RLS service config through it.
r := manual.NewBuilderWithScheme("rls-e2e")
scJSON := fmt.Sprintf(`
Expand All @@ -1008,7 +969,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
}
}
]
}`, wrappingTopLevelBalancerName, rlsServer.Address, multipleUpdateStateChildBalancerName)
}`, topLevelBalancerName, rlsServer.Address, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

Expand All @@ -1026,16 +987,6 @@ func (s) TestUpdateStatePauses(t *testing.T) {
case <-clientConnUpdateDone:
}

// Get the top-level LB policy configured on the channel, to be able to read
// the state updates pushed by its child (the RLS LB policy.)
var wb *wrappingTopLevelBalancer
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for state update on the top-level LB policy")
case b := <-bb.balCh:
wb = b.(*wrappingTopLevelBalancer)
}

// It is important to note that at this point no child policies have been
// created because we have not attempted any RPC so far. When we attempt an
// RPC (below), child policies will be created and their configs will be
Expand Down Expand Up @@ -1086,7 +1037,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
}

// Cache the state changes seen up to this point.
states0 := wb.getStates()
states0 := ccWrapper.getStates()

// Push an updated service config. As mentioned earlier, the previous config
// updates on the child policies did not happen in the context of a config
Expand All @@ -1113,7 +1064,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
}
}
]
}`, wrappingTopLevelBalancerName, rlsServer.Address, multipleUpdateStateChildBalancerName)
}`, topLevelBalancerName, rlsServer.Address, childPolicyName)
sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.UpdateState(resolver.State{ServiceConfig: sc})

Expand All @@ -1127,7 +1078,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
// UpdateState as part of handling their configs, we expect the RLS policy
// to inhibit picker updates during this time frame, and send a single
// picker once the config update is completely handled.
states1 := wb.getStates()
states1 := ccWrapper.getStates()
if len(states1) != len(states0)+1 {
t.Fatalf("more than one state update seen. before %v, after %v", states0, states1)
}
Expand Down
48 changes: 13 additions & 35 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,19 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
// change it to a balancer that has separate behavior logically (creating
// SubConn for second address in address list and always picking that
// SubConn), and see if the downstream behavior reflects that change.
bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
ccs.ResolverState.Addresses = ccs.ResolverState.Addresses[1:]
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
})
builder := balancer.Get(childPolicyName)
bg.UpdateBuilder(testBalancerIDs[0], builder)
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}
Expand Down Expand Up @@ -643,37 +655,3 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
}
}
}

type wrappedPickFirstBalancerBuilder struct{}

func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (wrappedPickFirstBalancerBuilder) Name() string {
return "wrappedPickFirstBalancer"
}

type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
return wb.Balancer.UpdateClientConnState(s)
}

func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}
Loading

0 comments on commit 67a8e73

Please sign in to comment.