Skip to content

Commit

Permalink
fix pump init order, register the node after listen ports (pingcap#709)…
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and july2993 committed Aug 16, 2019
1 parent ca71e0d commit 2996270
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pump/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewPumpNode(cfg *Config, getMaxCommitTs func() int64) (node.Node, error) {
status := &node.Status{
NodeID: nodeID,
Addr: advURL.Host,
State: node.Online,
State: node.Paused,
IsAlive: true,
}

Expand Down
84 changes: 49 additions & 35 deletions pump/server.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pump

import (
Expand Down Expand Up @@ -304,41 +317,6 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi

// Start runs Pump Server to serve the listening addr, and maintains heartbeat to Etcd
func (s *Server) Start() error {
// register this node
ts, err := s.getTSO()
if err != nil {
return errors.Annotate(err, "fail to get tso from pd")
}
status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Online, 0, s.storage.MaxCommitTS(), ts)
err = s.node.RefreshStatus(context.Background(), status)
if err != nil {
return errors.Annotate(err, "fail to register node to etcd")
}

log.Infof("register success, this pump's node id is %s", s.node.NodeStatus().NodeID)

// notify all cisterns
ctx, _ := context.WithTimeout(s.ctx, notifyDrainerTimeout)
if err := s.node.Notify(ctx); err != nil {
// if fail, refresh this node's state to paused
status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Paused, 0, s.storage.MaxCommitTS(), 0)
rerr := s.node.RefreshStatus(context.Background(), status)
if rerr != nil {
log.Errorf("unregister pump while pump fails to notify drainer error %v", errors.ErrorStack(err))
}
return errors.Annotate(err, "fail to notify all living drainer")
}

log.Debug("notify success")

errc := s.node.Heartbeat(s.ctx)
go func() {
for err := range errc {
if err != context.Canceled {
log.Errorf("send heartbeat error %v", err)
}
}
}()

// start a UNIX listener
var unixLis net.Listener
Expand Down Expand Up @@ -411,6 +389,42 @@ func (s *Server) Start() error {

go http.Serve(httpL, nil)

// register this node
ts, err := s.getTSO()
if err != nil {
return errors.Annotate(err, "fail to get tso from pd")
}
status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Online, 0, s.storage.MaxCommitTS(), ts)
err = s.node.RefreshStatus(context.Background(), status)
if err != nil {
return errors.Annotate(err, "fail to register node to etcd")
}

log.Infof("register success, this pump's node id is %s", s.node.NodeStatus().NodeID)

// notify all cisterns
ctx, _ := context.WithTimeout(s.ctx, notifyDrainerTimeout)
if err := s.node.Notify(ctx); err != nil {
// if fail, refresh this node's state to paused
status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Paused, 0, s.storage.MaxCommitTS(), 0)
rerr := s.node.RefreshStatus(context.Background(), status)
if rerr != nil {
log.Errorf("unregister pump while pump fails to notify drainer error %v", errors.ErrorStack(err))
}
return errors.Annotate(err, "fail to notify all living drainer")
}

log.Debug("notify success")

errc := s.node.Heartbeat(s.ctx)
go func() {
for err := range errc {
if err != context.Canceled {
log.Errorf("send heartbeat error %v", err)
}
}
}()

log.Infof("start to server request on %s", s.tcpAddr)
err = m.Serve()
if strings.Contains(err.Error(), "use of closed network connection") {
Expand Down

0 comments on commit 2996270

Please sign in to comment.