Skip to content

Commit

Permalink
server: cherry pick some fixes (#1321)
Browse files Browse the repository at this point in the history
* server: use same initialcluster config to restart joined member (#1279)

* server/leader: use the last modify revision to watch leader (#1317)
  • Loading branch information
nolouch authored Nov 12, 2018
1 parent 2cf6f01 commit 4ebb7f8
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 25 deletions.
6 changes: 6 additions & 0 deletions pkg/integration_test/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func (s *testServer) GetClusterID() uint64 {
return s.server.ClusterID()
}

func (s *testServer) GetLeader() *pdpb.Member {
s.RLock()
defer s.RUnlock()
return s.server.GetLeader()
}

func (s *testServer) GetClusterVersion() semver.Version {
s.RLock()
defer s.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions pkg/integration_test/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package integration

import (
"context"
"os"
"path"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -44,6 +46,8 @@ func (s *integrationTestSuite) TestSimpleJoin(c *C) {
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
_, err = os.Stat(path.Join(pd2.GetConfig().DataDir, "join"))
c.Assert(os.IsNotExist(err), IsFalse)
members, err = etcdutil.ListEtcdMembers(client)
c.Assert(err, IsNil)
c.Assert(members.Members, HasLen, 2)
Expand All @@ -57,6 +61,8 @@ func (s *integrationTestSuite) TestSimpleJoin(c *C) {
c.Assert(err, IsNil)
err = pd3.Run(context.TODO())
c.Assert(err, IsNil)
_, err = os.Stat(path.Join(pd3.GetConfig().DataDir, "join"))
c.Assert(os.IsNotExist(err), IsFalse)
members, err = etcdutil.ListEtcdMembers(client)
c.Assert(err, IsNil)
c.Assert(members.Members, HasLen, 3)
Expand Down
58 changes: 58 additions & 0 deletions pkg/integration_test/leader_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"time"

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/testutil"
)

func (s *integrationTestSuite) TestWatcher(c *C) {
c.Parallel()
cluster, err := newTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
pd1 := cluster.GetServer(cluster.GetLeader())
c.Assert(pd1, NotNil)

pd2, err := cluster.Join()
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
cluster.WaitLeader()

time.Sleep(5 * time.Second)
pd3, err := cluster.Join()
c.Assert(err, IsNil)
gofail.Enable("github.com/pingcap/pd/server/delayWatcher", `sleep("15s")`)
err = pd3.Run(context.Background())
c.Assert(err, IsNil)
time.Sleep(200 * time.Millisecond)
c.Assert(pd3.GetLeader().GetName(), Equals, pd1.GetConfig().Name)
pd1.Stop()
cluster.WaitLeader()
c.Assert(pd2.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
testutil.WaitUntil(c, func(c *C) bool {
return c.Check(pd3.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
})
c.Succeed()
}
36 changes: 34 additions & 2 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
Expand All @@ -26,6 +27,13 @@ import (
log "github.com/sirupsen/logrus"
)

const (
// privateFileMode grants owner to read/write a file.
privateFileMode = 0600
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode = 0700
)

// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
Expand Down Expand Up @@ -73,8 +81,20 @@ func PrepareJoinCluster(cfg *Config) error {
return errors.New("join self is forbidden")
}

// Cases with data directory.
filePath := path.Join(cfg.DataDir, "join")
// Read the persist join config
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
s, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal("read the join config meet error: ", err)
}
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

initialCluster := ""
// Cases with data directory.
if isDataExist(path.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down Expand Up @@ -103,6 +123,9 @@ func PrepareJoinCluster(cfg *Config) error {

existed := false
for _, m := range listResp.Members {
if len(m.Name) == 0 {
return errors.New("there is a member that has not joined successfully")
}
if m.Name == cfg.Name {
existed = true
}
Expand Down Expand Up @@ -131,14 +154,23 @@ func PrepareJoinCluster(cfg *Config) error {
if memb.ID == addResp.Member.ID {
n = cfg.Name
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
err = os.MkdirAll(cfg.DataDir, privateDirMode)
if err != nil && !os.IsExist(err) {
return errors.WithStack(err)
}

err = ioutil.WriteFile(filePath, []byte(cfg.InitialCluster), privateFileMode)
return errors.WithStack(err)
}

func isDataExist(d string) bool {
Expand Down
19 changes: 10 additions & 9 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) leaderLoop() {
continue
}

leader, err := getLeader(s.client, s.getLeaderPath())
leader, rev, err := getLeader(s.client, s.getLeaderPath())
if err != nil {
log.Errorf("get leader err %v", err)
time.Sleep(200 * time.Millisecond)
Expand All @@ -100,7 +100,7 @@ func (s *Server) leaderLoop() {
}
} else {
log.Infof("leader is %s, watch it", leader)
s.watchLeader(leader)
s.watchLeader(leader, rev)
log.Info("leader changed, try to campaign leader")
}
}
Expand Down Expand Up @@ -157,17 +157,17 @@ func (s *Server) etcdLeaderLoop() {
}

// getLeader gets server leader from etcd.
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, error) {
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
ok, err := getProtoMsg(c, leaderPath, leader)
ok, rev, err := getProtoMsgWithModRev(c, leaderPath, leader)
if err != nil {
return nil, err
return nil, 0, err
}
if !ok {
return nil, nil
return nil, 0, nil
}

return leader, nil
return leader, rev, nil
}

// GetEtcdLeader returns the etcd leader ID.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *Server) campaignLeader() error {
}
}

func (s *Server) watchLeader(leader *pdpb.Member) {
func (s *Server) watchLeader(leader *pdpb.Member, revision int64) {
s.leader.Store(leader)
defer s.leader.Store(&pdpb.Member{})

Expand All @@ -300,7 +300,8 @@ func (s *Server) watchLeader(leader *pdpb.Member) {
defer cancel()

for {
rch := watcher.Watch(ctx, s.getLeaderPath())
// gofail: var delayWatcher struct{}
rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision))
for wresp := range rch {
if wresp.Canceled {
return
Expand Down
2 changes: 1 addition & 1 deletion server/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *testTimeFallBackSuite) TestTimeFallBack(c *C) {

func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Member {
for i := 0; i < 20; i++ {
leader, err := getLeader(client, leaderPath)
leader, _, err := getLeader(client, leaderPath)
c.Assert(err, IsNil)
if leader != nil {
return leader
Expand Down
33 changes: 20 additions & 13 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,18 @@ func CheckPDVersion(opt *scheduleOption) {
}

// A helper function to get value with key from etcd.
// TODO: return the value revision for outer use.
func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) {
resp, err := get(c, key, opts...)
if err != nil {
return nil, err
}
if resp == nil {
return nil, nil
}
return resp.Kvs[0].Value, nil
}

func get(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
resp, err := kvGet(c, key, opts...)
if err != nil {
return nil, err
Expand All @@ -93,26 +103,23 @@ func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte
} else if n > 1 {
return nil, errors.Errorf("invalid get value resp %v, must only one", resp.Kvs)
}

return resp.Kvs[0].Value, nil
return resp, nil
}

// Return boolean to indicate whether the key exists or not.
// TODO: return the value revision for outer use.
func getProtoMsg(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, error) {
value, err := getValue(c, key, opts...)
func getProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, int64, error) {
resp, err := get(c, key, opts...)
if err != nil {
return false, err
return false, 0, err
}
if value == nil {
return false, nil
if resp == nil {
return false, 0, nil
}

value := resp.Kvs[0].Value
if err = proto.Unmarshal(value, msg); err != nil {
return false, errors.WithStack(err)
return false, 0, errors.WithStack(err)
}

return true, nil
return true, resp.Kvs[0].ModRevision, nil
}

func initOrGetClusterID(c *clientv3.Client, key string) (uint64, error) {
Expand Down

0 comments on commit 4ebb7f8

Please sign in to comment.