From 2996270fc1fff61808c705bb71fd045e712fbbbc Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 16 Aug 2019 12:50:36 +0800 Subject: [PATCH] fix pump init order, register the node after listen ports (#709) (#716) --- pump/node.go | 2 +- pump/server.go | 84 +++++++++++++++++++++++++++++--------------------- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/pump/node.go b/pump/node.go index fec69f9ae..94c727348 100644 --- a/pump/node.go +++ b/pump/node.go @@ -86,7 +86,7 @@ func NewPumpNode(cfg *Config, getMaxCommitTs func() int64) (node.Node, error) { status := &node.Status{ NodeID: nodeID, Addr: advURL.Host, - State: node.Online, + State: node.Paused, IsAlive: true, } diff --git a/pump/server.go b/pump/server.go index a517d3fdf..17a2c4d01 100644 --- a/pump/server.go +++ b/pump/server.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package pump import ( @@ -304,41 +317,6 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi // Start runs Pump Server to serve the listening addr, and maintains heartbeat to Etcd func (s *Server) Start() error { - // register this node - ts, err := s.getTSO() - if err != nil { - return errors.Annotate(err, "fail to get tso from pd") - } - status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Online, 0, s.storage.MaxCommitTS(), ts) - err = s.node.RefreshStatus(context.Background(), status) - if err != nil { - return errors.Annotate(err, "fail to register node to etcd") - } - - log.Infof("register success, this pump's node id is %s", s.node.NodeStatus().NodeID) - - // notify all cisterns - ctx, _ := context.WithTimeout(s.ctx, notifyDrainerTimeout) - if err := s.node.Notify(ctx); err != nil { - // if fail, refresh this node's state to paused - status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Paused, 0, s.storage.MaxCommitTS(), 0) - rerr := s.node.RefreshStatus(context.Background(), status) - if rerr != nil { - log.Errorf("unregister pump while pump fails to notify drainer error %v", errors.ErrorStack(err)) - } - return errors.Annotate(err, "fail to notify all living drainer") - } - - log.Debug("notify success") - - errc := s.node.Heartbeat(s.ctx) - go func() { - for err := range errc { - if err != context.Canceled { - log.Errorf("send heartbeat error %v", err) - } - } - }() // start a UNIX listener var unixLis net.Listener @@ -411,6 +389,42 @@ func (s *Server) Start() error { go http.Serve(httpL, nil) + // register this node + ts, err := s.getTSO() + if err != nil { + return errors.Annotate(err, "fail to get tso from pd") + } + status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Online, 0, s.storage.MaxCommitTS(), ts) + err = s.node.RefreshStatus(context.Background(), status) + if err != nil { + return errors.Annotate(err, "fail to register node to etcd") + } + + log.Infof("register success, this pump's node id is %s", s.node.NodeStatus().NodeID) + + // notify all cisterns + ctx, _ := context.WithTimeout(s.ctx, notifyDrainerTimeout) + if err := s.node.Notify(ctx); err != nil { + // if fail, refresh this node's state to paused + status := node.NewStatus(s.node.NodeStatus().NodeID, s.node.NodeStatus().Addr, node.Paused, 0, s.storage.MaxCommitTS(), 0) + rerr := s.node.RefreshStatus(context.Background(), status) + if rerr != nil { + log.Errorf("unregister pump while pump fails to notify drainer error %v", errors.ErrorStack(err)) + } + return errors.Annotate(err, "fail to notify all living drainer") + } + + log.Debug("notify success") + + errc := s.node.Heartbeat(s.ctx) + go func() { + for err := range errc { + if err != context.Canceled { + log.Errorf("send heartbeat error %v", err) + } + } + }() + log.Infof("start to server request on %s", s.tcpAddr) err = m.Serve() if strings.Contains(err.Error(), "use of closed network connection") {