Skip to content

Commit

Permalink
introduce grpc health check in etcd client poc with fail over on defr…
Browse files Browse the repository at this point in the history
…ag test

Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Jul 20, 2023
1 parent cd453b9 commit 86b1ea4
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 12 deletions.
13 changes: 11 additions & 2 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/health"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -256,7 +257,11 @@ func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {

// Using ad-hoc created resolver, to guarantee only explicitly given
// endpoint is used.
return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
serviceConfigJSON := `{"loadBalancingPolicy": "round_robin"}`
if c.cfg.EnableGRPCHealthCheck {
serviceConfigJSON = `{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`
}
return c.dial(creds, grpc.WithResolvers(resolver.New(serviceConfigJSON, ep)))
}

func (c *Client) getToken(ctx context.Context) error {
Expand Down Expand Up @@ -408,7 +413,11 @@ func newClient(cfg *Config) (*Client, error) {
client.callOpts = callOpts
}

client.resolver = resolver.New(cfg.Endpoints...)
serviceConfigJSON := `{"loadBalancingPolicy": "round_robin"}`
if cfg.EnableGRPCHealthCheck {
serviceConfigJSON = `{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`
}
client.resolver = resolver.New(serviceConfigJSON, cfg.Endpoints...)

if len(cfg.Endpoints) < 1 {
client.cancel()
Expand Down
2 changes: 2 additions & 0 deletions client/v3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Config struct {
// RejectOldCluster when set will refuse to create a client against an outdated cluster.
RejectOldCluster bool `json:"reject-old-cluster"`

EnableGRPCHealthCheck bool `json:"enable-grpc-health-check"`

// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
// For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
// Without this, Dial returns immediately and connecting the server happens in background.
Expand Down
11 changes: 6 additions & 5 deletions client/v3/internal/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ const (
// using SetEndpoints.
type EtcdManualResolver struct {
*manual.Resolver
endpoints []string
serviceConfig *serviceconfig.ParseResult
endpoints []string
serviceConfigJSON string
serviceConfig *serviceconfig.ParseResult
}

func New(endpoints ...string) *EtcdManualResolver {
func New(serviceConfigJSON string, endpoints ...string) *EtcdManualResolver {
r := manual.NewBuilderWithScheme(Schema)
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
return &EtcdManualResolver{Resolver: r, serviceConfigJSON: serviceConfigJSON, endpoints: endpoints, serviceConfig: nil}
}

// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.serviceConfig = cc.ParseServiceConfig(r.serviceConfigJSON)
if r.serviceConfig.Err != nil {
return nil, r.serviceConfig.Err
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
3 changes: 2 additions & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
Expand All @@ -85,6 +84,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)

pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, hsrv))

// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)

Expand Down
14 changes: 11 additions & 3 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"time"

"github.com/dustin/go-humanize"
"go.etcd.io/raft/v3"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
Expand All @@ -32,7 +35,6 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -75,10 +77,11 @@ type maintenanceServer struct {
cs ClusterStatusGetter
d Downgrader
vs serverversion.Server
hs *health.Server
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s)}
func NewMaintenanceServer(s *etcdserver.EtcdServer, hs *health.Server) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, vs: etcdserver.NewServerVersionAdapter(s), hs: hs}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -87,6 +90,11 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
if ms.hs != nil {
ms.hs.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
defer ms.hs.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
}

err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/v3_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"bytes"
"context"
"crypto/tls"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
Expand All @@ -29,6 +31,80 @@ import (
clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3"
)

func TestFailoverOnDefrag(t *testing.T) {
integration2.BeforeTest(t, integration2.WithFailpoint("defragBeforeCopy", `sleep(10000)`))
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
endpoints := clus.Endpoints()

cnt, success := 0, 0
donec := make(chan struct{})
errc := make(chan error, 1)

go func() {
var lastErr error
var cc *clientv3.Client
defer func() {
if cc != nil {
cc.Close()
}
errc <- lastErr
close(donec)
close(errc)
}()
cc, cerr := clientv3.New(clientv3.Config{
DialTimeout: 200 * time.Millisecond,
DialKeepAliveTime: 1 * time.Second,
DialKeepAliveTimeout: 200 * time.Millisecond,
Endpoints: endpoints,
EnableGRPCHealthCheck: true,
})
require.NoError(t, cerr)
timeout := time.After(10 * time.Second)
for {
select {
case <-timeout:
return
default:
}

// only simulate traffic when defrag is active.
m, err := clus.Members[0].Metric("etcd_disk_defrag_inflight")
if err != nil {
continue
}
defragActive, err := strconv.Atoi(m)
if err != nil {
panic(err)
}
if defragActive == 0 {
continue
}

cctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
_, err = cc.Get(cctx, "health")
cancel()
cnt++
if err != nil {
lastErr = err
continue
}
success++
}
}()
// wait for the grpc connection is established
time.Sleep(500 * time.Millisecond)
_, err := clus.Client(0).Defragment(context.Background(), endpoints[0])
require.NoError(t, err)

<-donec
err, ok := <-errc
if ok && err != nil {
t.Errorf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume success %d requests, total %d requests", (1-float64(success)/float64(cnt))*100, success, cnt)
}

func TestFailover(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit 86b1ea4

Please sign in to comment.