Skip to content

Commit

Permalink
client: supports to add gRPC dial options (#2035) (#2043)
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and sre-bot committed Dec 23, 2019
1 parent 4fea0f0 commit ea2b748
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
36 changes: 30 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package pd

import (
"context"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -129,7 +131,8 @@ type client struct {
ctx context.Context
cancel context.CancelFunc

security SecurityOption
security SecurityOption
gRPCDialOptions []grpc.DialOption
}

// SecurityOption records options about tls
Expand All @@ -139,13 +142,23 @@ type SecurityOption struct {
KeyPath string
}

// ClientOption configures client.
type ClientOption func(c *client)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *client) {
c.gRPCDialOptions = append(c.gRPCDialOptions, opts...)
}
}

// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security)
func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts...)
}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (Client, error) {
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs))
ctx1, cancel := context.WithCancel(ctx)
c := &client{
Expand All @@ -158,6 +171,9 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
security: security,
}
c.connMu.clientConns = make(map[string]*grpc.ClientConn)
for _, opt := range opts {
opt(c)
}

if err := c.initRetry(c.initClusterID); err != nil {
cancel()
Expand All @@ -182,6 +198,14 @@ func (c *client) updateURLs(members []*pdpb.Member) {
for _, m := range members {
urls = append(urls, m.GetClientUrls()...)
}

sort.Strings(urls)
// the url list is same.
if reflect.DeepEqual(c.urls, urls) {
return
}

log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls))
c.urls = urls
}

Expand Down Expand Up @@ -222,7 +246,7 @@ func (c *client) updateLeader() error {
ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout)
members, err := c.getMembers(ctx, u)
if err != nil {
log.Warn("cannot update leader", zap.String("address", u), zap.Error(err))
log.Warn("[pd] cannot update leader", zap.String("address", u), zap.Error(err))
}
cancel()
if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
Expand Down Expand Up @@ -283,7 +307,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return conn, nil
}

cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath)
cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath, c.gRPCDialOptions...)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
36 changes: 36 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,28 @@ func (s *testClientSuite) TestScatterRegion(c *C) {
c.Succeed()
}

func (s *testClientSuite) TestUpdateURLs(c *C) {
members := []*pdpb.Member{
{Name: "pd4", ClientUrls: []string{"tmp//pd4"}},
{Name: "pd1", ClientUrls: []string{"tmp//pd1"}},
{Name: "pd3", ClientUrls: []string{"tmp//pd3"}},
{Name: "pd2", ClientUrls: []string{"tmp//pd2"}},
}
getURLs := func(ms []*pdpb.Member) (urls []string) {
for _, m := range ms {
urls = append(urls, m.GetClientUrls()[0])
}
return
}
cli := &client{}
cli.updateURLs(members[1:])
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]}))
cli.updateURLs(members[1:])
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]}))
cli.updateURLs(members)
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}))
}

var _ = Suite(&testClientCtxSuite{})

type testClientCtxSuite struct{}
Expand All @@ -481,3 +503,17 @@ func (s *testClientCtxSuite) TestClientCtx(c *C) {
c.Assert(err, NotNil)
c.Assert(time.Since(start), Less, time.Second*4)
}

var _ = Suite(&testClientDialOptionSuite{})

type testClientDialOptionSuite struct{}

func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
defer cancel()
// nolint
_, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second)))
c.Assert(err, NotNil)
c.Assert(time.Since(start), Greater, 800*time.Millisecond)
}

0 comments on commit ea2b748

Please sign in to comment.