Skip to content

Commit

Permalink
Merge pull request #10911 from gyuho/balancer
Browse files Browse the repository at this point in the history
clientv3: fix secure endpoint failover, refactor with gRPC 1.22 upgrade
  • Loading branch information
gyuho authored Jul 26, 2019
2 parents 50babc1 + a7b8034 commit 89d4002
Show file tree
Hide file tree
Showing 263 changed files with 38,075 additions and 8,146 deletions.
3 changes: 2 additions & 1 deletion .words
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ MiB
ResourceExhausted
RPC
RPCs

parsedTarget
SRV
WithRequireLeader
InfoLevel
args
Expand Down
118 changes: 70 additions & 48 deletions clientv3/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package balancer implements client balancer.
package balancer

import (
"fmt"
"strconv"
"sync"
"time"

"go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"

"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)

// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy

// Picker implements gRPC picker.
// Leave empty if "Policy" field is not custom.
// TODO: currently custom policy is not supported.
// Picker picker.Picker

// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string

// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
}

// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
Expand Down Expand Up @@ -59,16 +80,13 @@ func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balan

addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),

currentConn: nil,
csEvltr: &connectivityStateEvaluator{},
currentConn: nil,
connectivityRecorder: connectivity.New(b.cfg.Logger),

// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if bb.lg == nil {
bb.lg = zap.NewNop()
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}

// TODO: support multiple connections
Expand Down Expand Up @@ -112,13 +130,12 @@ type baseBalancer struct {

addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State
scToSt map[balancer.SubConn]grpcconnectivity.State

currentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator
currentConn balancer.ClientConn
connectivityRecorder connectivity.Recorder

picker.Picker
picker picker.Picker
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
Expand All @@ -128,7 +145,11 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Strings("addresses", addrsToStrings(addrs)),
)

bb.mu.Lock()
defer bb.mu.Unlock()
Expand All @@ -139,12 +160,13 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
Expand All @@ -157,6 +179,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)

bb.lg.Info(
"removed subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
Expand All @@ -171,95 +194,94 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()

old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
}

bb.lg.Info(
"state changed",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)

bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
case grpcconnectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}

oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)
oldAggrState := bb.connectivityRecorder.GetCurrentState()
bb.connectivityRecorder.RecordTransition(old, s)

// Regenerate picker when one of the following happens:
// Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
bb.updatePicker()
}

bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}

func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
func (bb *baseBalancer) updatePicker() {
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
"generated transient error picker",
"updated picker to transient error picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}

// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}

switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)

default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}

bb.picker = picker.New(picker.Config{
Policy: bb.policy,
Logger: bb.lg,
SubConnToResolverAddress: scToAddr,
})
bb.lg.Info(
"generated picker",
"updated picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
zap.Int("subconn-size", len(scToAddr)),
)
}

Expand Down
36 changes: 0 additions & 36 deletions clientv3/balancer/config.go

This file was deleted.

58 changes: 0 additions & 58 deletions clientv3/balancer/connectivity.go

This file was deleted.

Loading

0 comments on commit 89d4002

Please sign in to comment.