Skip to content

Commit

Permalink
Add custom lb example
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Oct 16, 2023
1 parent e14d583 commit 9521bb6
Show file tree
Hide file tree
Showing 6 changed files with 494 additions and 12 deletions.
57 changes: 57 additions & 0 deletions examples/features/customloadbalancer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Custom Load Balancer

This examples shows how to deploy a custom load balancer in a `ClientConn`.

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

Two echo servers are serving on "localhost:20000" and "localhost:20001". They will include their
serving address in the response. So the server on "localhost:20001" will reply to the RPC
with `this is examples/customloadbalancing (from localhost:20001)`.

A client is created, to connect to both of these servers (they get both
server addresses from the name resolver in two separate endpoints). The client is configured with the load
balancer specified in the service config, which in this case is custom_round_robin.

### custom_round_robin

The client is configured to use `custom_round_robin`. `custom_round_robin` is a petiole policy,
which creates a pick first child for every endpoint it receives. It waits until both pick first children
become ready, then defers to the first pick first child's picker, choosing the connection to localhost:20000, except
every n times, where it defers to second pick first child's picker, choosing the connection to localhost:20001.

`custom_round_robin` is written as a petiole policy wrapping `pick_first` load balancers, one for every endpoint received.
This is the intended way a user written custom lb should be specified, as pick first will contain a lot of useful
functionlaity, such as Sticky Transient Failure, Happy Eyeballs, and Health Checking.

```
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20001)
this is examples/customloadbalancing (from localhost:20000)
this is examples/customloadbalancing (from localhost:20000)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package customroundrobin

import (
"encoding/json"
"fmt"
"sync/atomic"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

func init() {
balancer.Register(customRoundRobinBuilder{})
}

const customRRName = "custom_round_robin"

type customRRConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

// N represents how often pick iterations chose the second SubConn in the
// list. Defaults to 3. If 0 never choses second SubConn.
N uint32 `json:"n,omitempty"`
}

type customRoundRobinBuilder struct{}

func (customRoundRobinBuilder) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
lbConfig := &customRRConfig{
N: 3,
}
if err := json.Unmarshal(s, lbConfig); err != nil {
return nil, fmt.Errorf("custom-round-robin: unable to unmarshal customRRConfig: %v", err)
}
return lbConfig, nil
}

func (customRoundRobinBuilder) Name() string {
return customRRName
}

func (customRoundRobinBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
pfBuilder := balancer.Get(grpc.PickFirstBalancerName)
if pfBuilder == nil {
return nil
}
return &customRoundRobin{
cc: cc,
bOpts: bOpts,
pfs: resolver.NewEndpointMap(),
pickFirstBuilder: pfBuilder,
}
}

var logger = grpclog.Component("example")

type customRoundRobin struct {
// All state and operations on this balancer are either initialized at build
// time and read only after, or are only accessed as part of it's
// balancer.Balancer API (UpdateState from children only comes in from
// balancer.Balancer calls as well, and children are called one at a time),
// in which calls are guaranteed to come synchronously. Thus, no extra
// synchronization is required in this balancer.
cc balancer.ClientConn
bOpts balancer.BuildOptions
// Note that this balancer is a petiole policy which wraps pick first (see
// gRFC A61). This is the intended way a user written custom lb should be
// specified, as pick first will contain a lot of useful functionality, such
// as Sticky Transient Failure, Happy Eyeballs, and Health Checking.
pickFirstBuilder balancer.Builder
pfs *resolver.EndpointMap

n uint32
inhibitPickerUpdates bool
}

func (crr *customRoundRobin) UpdateClientConnState(state balancer.ClientConnState) error {
if logger.V(2) {
logger.Info("custom_round_robin: got new ClientConn state: ", state)
}
crrCfg, ok := state.BalancerConfig.(*customRRConfig)
if !ok {
return balancer.ErrBadResolverState
}
crr.n = crrCfg.N

endpointSet := resolver.NewEndpointMap()
crr.inhibitPickerUpdates = true
for _, endpoint := range state.ResolverState.Endpoints {
endpointSet.Set(endpoint, nil)
var pickFirst *balancerWrapper
pf, ok := crr.pfs.Get(endpoint)
if ok {
pickFirst = pf.(*balancerWrapper)
} else {
pickFirst = &balancerWrapper{
ClientConn: crr.cc,
crr: crr,
}
pfb := crr.pickFirstBuilder.Build(pickFirst, crr.bOpts)
pickFirst.Balancer = pfb
crr.pfs.Set(endpoint, pickFirst)
}
// Update child uncondtionally, in case attributes or address ordering
// changed. Let pick first deal with any potential diffs, too
// complicated to only update if we know something changed.
pickFirst.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{endpoint},
Attributes: state.ResolverState.Attributes,
},
// no service config, never needed to turn on address list shuffling
// bool in petiole policies.
})
// Ignore error because just care about ready children.
}
for _, e := range crr.pfs.Keys() {
ep, _ := crr.pfs.Get(e)
pickFirst := ep.(balancer.Balancer)
// pick first was removed by resolver (unique endpoint logically
// corresponding to pick first child was removed).
if _, ok := endpointSet.Get(e); !ok {
pickFirst.Close()
crr.pfs.Delete(e)
}
}
crr.inhibitPickerUpdates = false
crr.regeneratePicker() // one synchronous picker update per Update Client Conn State operation.
return nil
}

func (crr *customRoundRobin) ResolverError(err error) {
crr.inhibitPickerUpdates = true
for _, pf := range crr.pfs.Values() {
pickFirst := pf.(*balancerWrapper)
pickFirst.ResolverError(err)
}
crr.inhibitPickerUpdates = false
crr.regeneratePicker()
}

// This function is deprecated. SubConn state updates now come through listener
// callbacks. This balancer does not deal with SubConns directly and has no need
// to intercept listener callbacks.
func (crr *customRoundRobin) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("custom_round_robin: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (crr *customRoundRobin) Close() {
for _, pf := range crr.pfs.Values() {
pickFirst := pf.(balancer.Balancer)
pickFirst.Close()
}
}

// regeneratePicker generates a picker based off persisted child balancer state
// and forwards it upward. This is intended to be fully executed once per
// relevant balancer.Balancer operation into custom round robin balancer.
func (crr *customRoundRobin) regeneratePicker() {
if crr.inhibitPickerUpdates {
return
}

var readyPickers []balancer.Picker
for _, bw := range crr.pfs.Values() {
pickFirst := bw.(*balancerWrapper)
if pickFirst.state.ConnectivityState == connectivity.Ready {
readyPickers = append(readyPickers, pickFirst.state.Picker)
}
}

// For determinism, this balancer only updates it's picker when both
// backends of the example are ready. Thus, no need to keep track of
// aggregated state and can simply specify this balancer is READY once it
// has two ready children.
if len(readyPickers) != 2 {
return
}
picker := &customRoundRobinPicker{
pickers: readyPickers,
n: crr.n,
next: 0,
}
crr.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: picker,
})
}

type balancerWrapper struct {
balancer.Balancer // Simply forward balancer.Balancer operations
balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns

crr *customRoundRobin

state balancer.State
}

// Picker updates from pick first are all triggered by synchronous calls down
// into balancer.Balancer (client conn state updates, resolver errors, subconn
// state updates (through listener callbacks, which is still treated as part of
// balancer API)).
func (bw *balancerWrapper) UpdateState(state balancer.State) {
bw.state = state
// Calls back into this inline will be inhibited when part of
// UpdateClientConnState() and ResolverError(), and regenerate picker will
// be called manually at the end of those operations. However, for
// UpdateSubConnState() and subsequent UpdateState(), this needs to update
// picker, so call this regeneratePicker() here.
bw.crr.regeneratePicker()
}

type customRoundRobinPicker struct {
pickers []balancer.Picker
n uint32
next uint32
}

func (crrp *customRoundRobinPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
next := atomic.AddUint32(&crrp.next, 1)
index := 0
if next != 0 && next%crrp.n == 0 {
index = 1
}
childPicker := crrp.pickers[index%len(crrp.pickers)]
return childPicker.Pick(info)
}
74 changes: 74 additions & 0 deletions examples/features/customloadbalancer/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package main

import (
"context"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/examples/features/customloadbalancer/client/customroundrobin" // To register custom_round_robin.
"google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)

var (
addr1 = "localhost:20000"
addr2 = "localhost:20001"
)

func main() {
mr := manual.NewBuilderWithScheme("example")
defer mr.Close()

// You can also plug in your own custom lb policy, which needs to be
// configurable. This n is configurable. Try changing n and see how the
// behavior changes.
json := `{"loadBalancingConfig": [{"custom_round_robin":{"n": 3}}]}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: addr1}}},
{Addresses: []resolver.Address{{Addr: addr2}}},
},
ServiceConfig: sc,
})

cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to dial: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ec := echo.NewEchoClient(cc)
// Make 20 rpcs to show distribution.
for i := 0; i < 20; i++ {
r, err := ec.UnaryEcho(ctx, &echo.EchoRequest{Message: "this is examples/customloadbalancing"})
if err != nil {
log.Fatalf("UnaryEcho failed: %v", err)
}
fmt.Println(r)
}
}
Loading

0 comments on commit 9521bb6

Please sign in to comment.