Skip to content

Commit

Permalink
master: support leader to serve only and return leader info (pingcap#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Dec 21, 2021
1 parent 63cdbb5 commit a6d6417
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 85 deletions.
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestClientManager(t *testing.T) {

masterCtx, masterCancel := context.WithCancel(ctx)
defer masterCancel()
err = masterServer.Start(masterCtx)
err = masterServer.Run(masterCtx)
require.Nil(t, err)

err = manager.AddMasterClient(ctx, []string{"127.0.0.1:1992"})
Expand Down
27 changes: 15 additions & 12 deletions cmd/master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"syscall"

"github.com/hanfei1991/microcosm/master"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -44,26 +44,29 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
server, err := master.NewServer(cfg, nil)
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
os.Exit(2)
}
err = server.Start(ctx)
if err != nil {
log.L().Error("fail to start dm-master", zap.Error(err))
log.L().Error("fail to start dataflow master", zap.Error(err))
os.Exit(2)
}

// 4. wait for stopping the process
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
go func() {
sig := <-sc
log.L().Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
select {
case <-ctx.Done():
case sig := <-sc:
log.L().Info("got signal to exit", zap.Stringer("signal", sig))
cancel()
}
}()
<-ctx.Done()

err = server.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
log.L().Error("run dataflow master with error", zap.Error(err))
os.Exit(2)
}
log.L().Info("server exits normally")
}
58 changes: 58 additions & 0 deletions master/campaign.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package master

import (
"context"
"time"

"github.com/hanfei1991/microcosm/pkg/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

func (s *Server) campaignLeaderLoop(ctx context.Context) error {
rl := rate.NewLimiter(rate.Every(time.Millisecond*200), 1)
for {
err := rl.Wait(ctx)
if err != nil {
return err
}
err = s.reset(ctx)
if err != nil {
return err
}
err = s.campaign(ctx)
switch err {
case nil:
case context.Canceled:
return ctx.Err()
case mvcc.ErrCompacted:
continue
default:
log.L().Warn("campaign leader failed", zap.Error(err))
return errors.Wrap(errors.ErrMasterCampaignLeader, err)
}
// TODO: if etcd leader is different with current server, resign current
// leader to keep them same
log.L().Info("campaign leader successfully", zap.String("server-id", s.name()))
cctx, cancel := context.WithCancel(ctx)
err = s.runLeaderService(cctx)
cancel()
return err
}
}

func (s *Server) campaign(ctx context.Context) error {
err := s.election.Campaign(ctx, s.name())
return errors.Wrap(errors.ErrMasterCampaignLeader, err)
}

func (s *Server) resign() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := s.election.Resign(ctx)
if err != nil {
log.L().Warn("resign leader failed", zap.Error(err))
}
}
65 changes: 65 additions & 0 deletions master/member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package master

import (
"context"
"encoding/json"

"github.com/hanfei1991/microcosm/pkg/adapter"
"github.com/hanfei1991/microcosm/pkg/errors"
"github.com/hanfei1991/microcosm/pkg/etcdutils"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

type Member struct {
IsServLeader bool
IsEtcdLeader bool
Name string
Addrs []string
}

func (s *Server) updateServerMasterMembers(ctx context.Context) error {
leader, err := etcdutils.GetLeaderID(ctx, s.etcdClient, adapter.MasterCampaignKey.Path())
if err != nil {
if err == concurrency.ErrElectionNoLeader {
log.L().Warn("etcd election no leader")
} else {
return err
}
}
if leader != "" {
resp, err := s.etcdClient.Get(ctx, adapter.MasterInfoKey.Encode(leader))
if err != nil {
return errors.Wrap(errors.ErrEtcdAPIError, err)
}
if resp.Count > 0 {
cfg := &Config{}
err = json.Unmarshal(resp.Kvs[0].Value, cfg)
if err != nil {
return err
}
leader = cfg.Etcd.Name
}
}
resp, err := s.etcdClient.MemberList(ctx)
if err != nil {
return err
}
members := make([]*Member, 0, len(resp.Members))
for _, m := range resp.Members {
isServLeader := m.Name == leader
if isServLeader {
s.leaderName.Store(m.Name)
}
members = append(members, &Member{
Name: m.Name,
Addrs: m.ClientURLs,
IsEtcdLeader: false, /* TODO */
IsServLeader: isServLeader,
})
}
s.members = members
log.L().Info("update members", zap.Any("members", members))
return nil
}
Loading

0 comments on commit a6d6417

Please sign in to comment.