From 360357e3f1be959e3ae2653e81ce607d79fa35ac Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 18 Dec 2019 15:39:01 +0800 Subject: [PATCH 1/5] client: supports to add gRPC dial options Signed-off-by: nolouch --- client/client.go | 40 ++++++++++++++++++++++++++++++++++------ client/client_test.go | 14 ++++++++++++++ 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index 72e1c487df6..9047401a2f3 100644 --- a/client/client.go +++ b/client/client.go @@ -135,7 +135,8 @@ type client struct { ctx context.Context cancel context.CancelFunc - security SecurityOption + security SecurityOption + gRPCDialOptions []grpc.DialOption } // SecurityOption records options about tls @@ -145,13 +146,23 @@ type SecurityOption struct { KeyPath string } +// ClientOption configures client. +type ClientOption func(c *client) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(c *client) { + c.gRPCDialOptions = append(c.gRPCDialOptions, opts...) + } +} + // NewClient creates a PD client. -func NewClient(pdAddrs []string, security SecurityOption) (Client, error) { - return NewClientWithContext(context.Background(), pdAddrs, security) +func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + return NewClientWithContext(context.Background(), pdAddrs, security, opts...) } // NewClientWithContext creates a PD client with context. -func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (Client, error) { +func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs)) ctx1, cancel := context.WithCancel(ctx) c := &client{ @@ -164,6 +175,9 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi security: security, } c.connMu.clientConns = make(map[string]*grpc.ClientConn) + for _, opt := range opts { + opt(c) + } if err := c.initRetry(c.initClusterID); err != nil { cancel() @@ -185,9 +199,23 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi func (c *client) updateURLs(members []*pdpb.Member) { urls := make([]string, 0, len(members)) + urlsMap := make(map[string]struct{}) for _, m := range members { urls = append(urls, m.GetClientUrls()...) } + for _, url := range urls { + urlsMap[url] = struct{}{} + } + needUpdate := false + for _, url := range c.urls { + if _, ok := urlsMap[url]; !ok { + needUpdate = true + } + } + if !needUpdate { + return + } + log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls)) c.urls = urls } @@ -228,7 +256,7 @@ func (c *client) updateLeader() error { ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout) members, err := c.getMembers(ctx, u) if err != nil { - log.Warn("cannot update leader", zap.String("address", u), zap.Error(err)) + log.Warn("[pd] cannot update leader", zap.String("address", u), zap.Error(err)) } cancel() if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { @@ -289,7 +317,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { return conn, nil } - cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath) + cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath, c.gRPCDialOptions...) if err != nil { return nil, errors.WithStack(err) } diff --git a/client/client_test.go b/client/client_test.go index 6a8fd8f5fe4..580f5c48f8a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/core" "go.uber.org/goleak" + "google.golang.org/grpc" ) func TestClient(t *testing.T) { @@ -490,3 +491,16 @@ func (s *testClientCtxSuite) TestClientCtx(c *C) { c.Assert(err, NotNil) c.Assert(time.Since(start), Less, time.Second*4) } + +var _ = Suite(&testClientDialOptionSuite{}) + +type testClientDialOptionSuite struct{} + +func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) { + start := time.Now() + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + _, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second))) + c.Assert(err, NotNil) + c.Assert(time.Since(start), Greater, 800*time.Millisecond) +} From b1318f8ac6ad465abc8d7ea02d6777ffeb953648 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 18 Dec 2019 16:07:28 +0800 Subject: [PATCH 2/5] fix ci Signed-off-by: nolouch --- client/client.go | 9 ++++++--- client/client_test.go | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index 9047401a2f3..480b61dbfc4 100644 --- a/client/client.go +++ b/client/client.go @@ -206,15 +206,18 @@ func (c *client) updateURLs(members []*pdpb.Member) { for _, url := range urls { urlsMap[url] = struct{}{} } - needUpdate := false + containsOldURLs := true for _, url := range c.urls { if _, ok := urlsMap[url]; !ok { - needUpdate = true + containsOldURLs = false } } - if !needUpdate { + + // the url list is same. + if len(urls) == len(c.urls) && containsOldURLs { return } + log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls)) c.urls = urls } diff --git a/client/client_test.go b/client/client_test.go index 580f5c48f8a..5d5113d6bfc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -500,6 +500,7 @@ func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() + // nolint _, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second))) c.Assert(err, NotNil) c.Assert(time.Since(start), Greater, 800*time.Millisecond) From b2b25dcc40e5f3c1ccfec13ecd684716048b4df2 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 18 Dec 2019 16:20:56 +0800 Subject: [PATCH 3/5] add break Signed-off-by: nolouch --- client/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/client.go b/client/client.go index 480b61dbfc4..44fb17947c2 100644 --- a/client/client.go +++ b/client/client.go @@ -210,6 +210,7 @@ func (c *client) updateURLs(members []*pdpb.Member) { for _, url := range c.urls { if _, ok := urlsMap[url]; !ok { containsOldURLs = false + break } } From 419ca767939c77d4353918f1af5e9f1c8eab0182 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 19 Dec 2019 11:41:16 +0800 Subject: [PATCH 4/5] address comments Signed-off-by: nolouch --- client/client.go | 16 ++++------------ client/client_test.go | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/client/client.go b/client/client.go index 44fb17947c2..abe399c5d02 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,8 @@ package pd import ( "context" + "reflect" + "sort" "strings" "sync" "time" @@ -199,23 +201,13 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi func (c *client) updateURLs(members []*pdpb.Member) { urls := make([]string, 0, len(members)) - urlsMap := make(map[string]struct{}) for _, m := range members { urls = append(urls, m.GetClientUrls()...) } - for _, url := range urls { - urlsMap[url] = struct{}{} - } - containsOldURLs := true - for _, url := range c.urls { - if _, ok := urlsMap[url]; !ok { - containsOldURLs = false - break - } - } + sort.Strings(urls) // the url list is same. - if len(urls) == len(c.urls) && containsOldURLs { + if reflect.DeepEqual(c.urls, urls) { return } diff --git a/client/client_test.go b/client/client_test.go index 5d5113d6bfc..a79898b3fb4 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -471,6 +471,28 @@ func (s *testClientSuite) TestScatterRegion(c *C) { c.Succeed() } +func (s *testClientSuite) TestUpdateURLs(c *C) { + members := []*pdpb.Member{ + {Name: "pd4", ClientUrls: []string{"tmp//pd4"}}, + {Name: "pd1", ClientUrls: []string{"tmp//pd1"}}, + {Name: "pd3", ClientUrls: []string{"tmp//pd3"}}, + {Name: "pd2", ClientUrls: []string{"tmp//pd2"}}, + } + getUrls := func(ms []*pdpb.Member) (urls []string) { + for _, m := range ms { + urls = append(urls, m.GetClientUrls()[0]) + } + return + } + cli := &client{} + cli.updateURLs(members[1:]) + c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2]})) + cli.updateURLs(members[1:]) + c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2]})) + cli.updateURLs(members) + c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2], members[0]})) +} + func (s *testClientSuite) TestTsLessEqual(c *C) { c.Assert(tsLessEqual(9, 9, 9, 9), IsTrue) c.Assert(tsLessEqual(8, 9, 9, 8), IsTrue) From 2074dbde3b5ab6e832235ba8f1761da699a2d441 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 19 Dec 2019 12:04:31 +0800 Subject: [PATCH 5/5] rename Signed-off-by: nolouch --- client/client_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index a79898b3fb4..1943eab6722 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -478,7 +478,7 @@ func (s *testClientSuite) TestUpdateURLs(c *C) { {Name: "pd3", ClientUrls: []string{"tmp//pd3"}}, {Name: "pd2", ClientUrls: []string{"tmp//pd2"}}, } - getUrls := func(ms []*pdpb.Member) (urls []string) { + getURLs := func(ms []*pdpb.Member) (urls []string) { for _, m := range ms { urls = append(urls, m.GetClientUrls()[0]) } @@ -486,11 +486,11 @@ func (s *testClientSuite) TestUpdateURLs(c *C) { } cli := &client{} cli.updateURLs(members[1:]) - c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2]})) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]})) cli.updateURLs(members[1:]) - c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2]})) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]})) cli.updateURLs(members) - c.Assert(cli.urls, DeepEquals, getUrls([]*pdpb.Member{members[1], members[3], members[2], members[0]})) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]})) } func (s *testClientSuite) TestTsLessEqual(c *C) {