From ba4b014e171127e0050923d1a48e7bdebf24976d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 1 Sep 2020 16:22:21 +0800 Subject: [PATCH] cherry pick #2853 to release-4.0 Signed-off-by: Ryan Leung --- client/base_client.go | 14 +-- client/client.go | 6 +- pkg/apiutil/serverapi/middleware.go | 5 +- pkg/dashboard/adapter/manager.go | 7 +- pkg/errs/errno.go | 128 ++++++++++++++++----- pkg/errs/errs.go | 3 + pkg/errs/errs_test.go | 16 +++ pkg/etcdutil/etcdutil.go | 19 +-- pkg/grpcutil/grpcutil.go | 10 +- pkg/metricutil/metricutil.go | 4 +- pkg/mock/mockhbstream/mockhbstream.go | 2 +- pkg/tempurl/check_env_linux.go | 6 +- plugin/scheduler_example/evict_leader.go | 4 +- server/api/plugin.go | 2 +- server/cluster/cluster_worker.go | 4 +- server/cluster/coordinator.go | 2 +- server/core/basic_cluster.go | 3 +- server/handler.go | 15 +-- server/heartbeat_streams.go | 3 +- server/kv/etcd_kv.go | 22 ++-- server/member/leader.go | 20 ++-- server/member/lease.go | 3 +- server/region_syncer/history_buffer.go | 3 +- server/replication/replication_mode.go | 23 ++-- server/schedule/checker/learner_checker.go | 4 +- server/schedule/checker/merge_checker.go | 3 +- server/schedule/checker/rule_checker.go | 5 +- server/schedule/placement/rule_manager.go | 13 ++- server/schedule/region_scatterer.go | 3 +- server/server.go | 11 +- server/tso/tso.go | 3 +- tools/pd-analysis/analysis/parse_log.go | 3 +- 32 files changed, 239 insertions(+), 130 deletions(-) diff --git a/client/base_client.go b/client/base_client.go index 40a7e7760ae..e63b9e47324 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -138,7 +138,7 @@ func (c *baseClient) leaderLoop() { } if err := c.updateLeader(); err != nil { - log.Error("[pd] failed updateLeader", errs.ZapError(errs.ErrUpdateLeader, err)) + log.Error("[pd] failed updateLeader", errs.ZapError(err)) } } } @@ -178,7 +178,7 @@ func (c *baseClient) initClusterID() error { members, err := c.getMembers(timeoutCtx, u) timeoutCancel() if err != nil || members.GetHeader() == nil { - log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(errs.ErrGetClusterID, err)) + log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(err)) continue } c.clusterID = members.GetHeader().GetClusterId() @@ -192,7 +192,7 @@ func (c *baseClient) updateLeader() error { ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout) members, err := c.getMembers(ctx, u) if err != nil { - log.Warn("[pd] cannot update leader", zap.String("address", u), errs.ZapError(errs.ErrUpdateLeader, err)) + log.Warn("[pd] cannot update leader", zap.String("address", u), errs.ZapError(err)) } cancel() if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { @@ -206,7 +206,7 @@ func (c *baseClient) updateLeader() error { c.updateURLs(members.GetMembers()) return c.switchLeader(members.GetLeader().GetClientUrls()) } - return errors.Errorf("failed to get leader from %v", c.urls) + return errs.ErrClientGetLeader.FastGenByArgs(c.urls) } func (c *baseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembersResponse, error) { @@ -217,7 +217,7 @@ func (c *baseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembe members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) if err != nil { attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) - return nil, errors.WithStack(attachErr) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() } return members, nil } @@ -274,13 +274,13 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) KeyPath: c.security.KeyPath, }.ToTLSConfig() if err != nil { - return nil, errors.WithStack(err) + return nil, err } dctx, cancel := context.WithTimeout(c.ctx, dialTimeout) defer cancel() cc, err := grpcutil.GetClientConn(dctx, addr, tlsCfg, c.gRPCDialOptions...) if err != nil { - return nil, errors.WithStack(err) + return nil, err } c.connMu.Lock() defer c.connMu.Unlock() diff --git a/client/client.go b/client/client.go index 24a7f5aa0be..eef43c1ffbd 100644 --- a/client/client.go +++ b/client/client.go @@ -180,7 +180,7 @@ func (c *client) tsCancelLoop() { case d := <-c.tsDeadlineCh: select { case <-d.timer: - log.Error("tso request is canceled due to timeout") + log.Error("tso request is canceled due to timeout", errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() case <-d.done: case <-ctx.Done(): @@ -233,7 +233,7 @@ func (c *client) tsLoop() { return default: } - log.Error("[pd] create tso stream error", errs.ZapError(errs.ErrCreateTSOStream, err)) + log.Error("[pd] create tso stream error", errs.ZapError(errs.ErrClientCreateTSOStream, err)) c.ScheduleCheckLeader() cancel() c.revokeTSORequest(errors.WithStack(err)) @@ -281,7 +281,7 @@ func (c *client) tsLoop() { return default: } - log.Error("[pd] getTS error", errs.ZapError(errs.ErrGetTSO, err)) + log.Error("[pd] getTS error", errs.ZapError(errs.ErrClientGetTSO, err)) c.ScheduleCheckLeader() cancel() stream, cancel = nil, nil diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 76c7af607fb..38a9dd23e7b 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/urfave/negroni" @@ -145,14 +146,14 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) resp, err := p.client.Do(r) if err != nil { - log.Error("request failed", zap.Error(err)) + log.Error("request failed", errs.ZapError(errs.ErrSendRequest, err)) continue } b, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { - log.Error("request failed", zap.Error(err)) + log.Error("read failed", errs.ZapError(errs.ErrIORead, err)) continue } diff --git a/pkg/dashboard/adapter/manager.go b/pkg/dashboard/adapter/manager.go index 8b9761d0aa4..bfeca2223e1 100644 --- a/pkg/dashboard/adapter/manager.go +++ b/pkg/dashboard/adapter/manager.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap-incubator/tidb-dashboard/pkg/apiserver" "github.com/pingcap/kvproto/pkg/pdpb" - "go.uber.org/zap" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -106,7 +105,7 @@ func (m *Manager) updateInfo() { var err error if m.members, err = cluster.GetMembers(m.srv.GetClient()); err != nil { - log.Warn("failed to get members", zap.Error(err)) + log.Warn("failed to get members", errs.ZapError(err)) m.members = nil return } @@ -196,7 +195,7 @@ func (m *Manager) startService() { return } if err := m.service.Start(m.ctx); err != nil { - log.Error("Can not start dashboard server", errs.ZapError(errs.ErrStartDashboard, err)) + log.Error("Can not start dashboard server", errs.ZapError(errs.ErrDashboardStart, err)) } else { log.Info("Dashboard server is started") } @@ -207,7 +206,7 @@ func (m *Manager) stopService() { return } if err := m.service.Stop(context.Background()); err != nil { - log.Error("Stop dashboard server error", errs.ZapError(errs.ErrStopDashboard, err)) + log.Error("Stop dashboard server error", errs.ZapError(errs.ErrDashboardStop, err)) } else { log.Info("Dashboard server is stopped") } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 8182f47fd4c..cac94bfb739 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -15,6 +15,7 @@ package errs import "github.com/pingcap/errors" +// The internal error which is generated in PD project. // tso errors var ( ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp")) @@ -22,31 +23,31 @@ var ( ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:tso:ErrIncorrectSystemTime")) ) -// adapter errors +// member errors var ( - ErrStartDashboard = errors.Normalize("start dashboard failed", errors.RFCCodeText("PD:adapter:ErrStartDashboard")) - ErrStopDashboard = errors.Normalize("stop dashboard failed", errors.RFCCodeText("PD:adapter:ErrStopDashboard")) + ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound")) + ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader")) ) -// member errors +// client errors var ( - ErretcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErretcdLeaderNotFound")) - ErrGetLeader = errors.Normalize("get leader failed", errors.RFCCodeText("PD:member:ErrGetLeader")) - ErrDeleteLeaderKey = errors.Normalize("delete leader key failed", errors.RFCCodeText("PD:member:ErrDeleteLeaderKey")) - ErrLoadLeaderPriority = errors.Normalize("load leader priority failed", errors.RFCCodeText("PD:member:ErrLoadLeaderPriority")) - ErrLoadetcdLeaderPriority = errors.Normalize("load etcd leader priority failed", errors.RFCCodeText("PD:member:ErrLoadetcdLeaderPriority")) - ErrTransferetcdLeader = errors.Normalize("transfer etcd leader failed", errors.RFCCodeText("PD:member:ErrTransferetcdLeader")) - ErrWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:member:ErrWatcherCancel")) - ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader")) + ErrWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:member:ErrWatcherCancel")) + ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) + ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) + ErrClientGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrClientGetTSO")) + ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader")) + ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) ) -// client errors +// scheduler errors var ( - ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:client:ErrCloseGRPCConn")) - ErrUpdateLeader = errors.Normalize("update leader failed", errors.RFCCodeText("PD:client:ErrUpdateLeader")) - ErrCreateTSOStream = errors.Normalize("create TSO stream failed", errors.RFCCodeText("PD:client:ErrCreateTSOStream")) - ErrGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrGetTSO")) - ErrGetClusterID = errors.Normalize("get cluster ID failed", errors.RFCCodeText("PD:client:ErrGetClusterID")) + ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore")) + ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted")) + ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound")) + ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist")) + ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig")) + ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow")) + ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth")) ) // placement errors @@ -57,29 +58,94 @@ var ( ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList")) ) -// kv errors +// cluster errors var ( - ErrEtcdKVSave = errors.Normalize("etcd KV save failed", errors.RFCCodeText("PD:kv:ErrEtcdKVSave")) - ErrEtcdKVRemove = errors.Normalize("etcd KV remove failed", errors.RFCCodeText("PD:kv:ErrEtcdKVRemove")) + ErrPersistStore = errors.Normalize("failed to persist store", errors.RFCCodeText("PD:cluster:ErrPersistStore")) + ErrDeleteRegion = errors.Normalize("failed to delete region from storage", errors.RFCCodeText("PD:cluster:ErrDeleteRegion")) + ErrSaveRegion = errors.Normalize("failed to save region from storage", errors.RFCCodeText("PD:cluster:ErrSaveRegion")) + ErrBuryStore = errors.Normalize("failed to bury store", errors.RFCCodeText("PD:cluster:ErrBuryStore")) + ErrDeleteStore = errors.Normalize("failed to delete store", errors.RFCCodeText("PD:cluster:ErrDeleteStore")) + ErrPersistClusterVersion = errors.Normalize("persist cluster version meet error", errors.RFCCodeText("PD:cluster:ErrPersistClusterVersion")) + ErrGetMembers = errors.Normalize("get members failed", errors.RFCCodeText("PD:cluster:ErrGetMembers")) ) -// scheduler errors +// grpcutil errors var ( - ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore")) - ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted")) - ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound")) - ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist")) - ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig")) - ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow")) - ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth")) + ErrSecurityConfig = errors.Normalize("security config error: %s", errors.RFCCodeText("PD:grpcutil:ErrSecurityConfig")) +) + +// The third-party project error. +// url errors +var ( + ErrURLParse = errors.Normalize("parse url error", errors.RFCCodeText("PD:url:ErrURLParse")) + ErrQueryUnescape = errors.Normalize("inverse transformation of QueryEscape error", errors.RFCCodeText("PD:url:ErrQueryUnescape")) +) + +// grpc errors +var ( + ErrGRPCDial = errors.Normalize("dial error", errors.RFCCodeText("PD:grpc:ErrGRPCDial")) + ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:grpc:ErrCloseGRPCConn")) +) + +// proto errors +var ( + ErrProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrProtoUnmarshal")) +) + +// etcd errors +var ( + ErrNewEtcdClient = errors.Normalize("new etcd client failed", errors.RFCCodeText("PD:etcd:ErrNewEtcdClient")) + ErrStartEtcd = errors.Normalize("start etcd failed", errors.RFCCodeText("PD:etcd:ErrStartEtcd")) + ErrEtcdURLMap = errors.Normalize("etcd url map error", errors.RFCCodeText("PD:etcd:ErrEtcdURLMap")) + ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease")) + ErrEtcdTxn = errors.Normalize("etcd Txn failed", errors.RFCCodeText("PD:etcd:ErrEtcdTxn")) + ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut")) + ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete")) + ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet")) + ErrEtcdKVGetResponse = errors.Normalize("etcd invalid get value response %v, must only one", errors.RFCCodeText("PD:etcd:ErrEtcdKVGetResponse")) + ErrEtcdGetCluster = errors.Normalize("etcd get cluster from remote peer failed", errors.RFCCodeText("PD:etcd:ErrEtcdGetCluster")) + ErrEtcdMoveLeader = errors.Normalize("etcd move leader error", errors.RFCCodeText("PD:etcd:ErrEtcdMoveLeader")) + ErrEtcdTLSConfig = errors.Normalize("etcd TLS config error", errors.RFCCodeText("PD:etcd:ErrEtcdTLSConfig")) + ErrEtcdWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:etcd:ErrEtcdWatcherCancel")) + ErrCloseEtcdClient = errors.Normalize("close etcd client failed", errors.RFCCodeText("PD:etcd:ErrCloseEtcdClient")) + ErrEtcdMemberList = errors.Normalize("etcd member list failed", errors.RFCCodeText("PD:etcd:ErrEtcdMemberList")) +) + +// dashboard errors +var ( + ErrDashboardStart = errors.Normalize("start dashboard failed", errors.RFCCodeText("PD:dashboard:ErrDashboardStart")) + ErrDashboardStop = errors.Normalize("stop dashboard failed", errors.RFCCodeText("PD:dashboard:ErrDashboardStop")) ) // strconv errors var ( + ErrStrconvParseInt = errors.Normalize("parse int error", errors.RFCCodeText("PD:strconv:ErrStrconvParseInt")) ErrStrconvParseUint = errors.Normalize("parse uint error", errors.RFCCodeText("PD:strconv:ErrStrconvParseUint")) ) -// url errors +// prometheus errors var ( - ErrQueryUnescape = errors.Normalize("inverse transformation of QueryEscape error", errors.RFCCodeText("PD:url:ErrQueryUnescape")) + ErrPrometheusPushMetrics = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:prometheus:ErrPrometheusPushMetrics")) +) + +// http errors +var ( + ErrSendRequest = errors.Normalize("send HTTP request failed", errors.RFCCodeText("PD:http:ErrSendRequest")) + ErrWriteHTTPBody = errors.Normalize("write HTTP body failed", errors.RFCCodeText("PD:http:ErrWriteHTTPBody")) + ErrNewHTTPRequest = errors.Normalize("new HTTP request failed", errors.RFCCodeText("PD:http:ErrNewHTTPRequest")) +) + +// ioutil error +var ( + ErrIORead = errors.Normalize("IO read error", errors.RFCCodeText("PD:ioutil:ErrIORead")) +) + +// netstat error +var ( + ErrNetstatTCPSocks = errors.Normalize("TCP socks error", errors.RFCCodeText("PD:netstat:ErrNetstatTCPSocks")) +) + +// hex error +var ( + ErrHexDecodingString = errors.Normalize("decode string %s error", errors.RFCCodeText("PD:hex:ErrHexDecodingString")) ) diff --git a/pkg/errs/errs.go b/pkg/errs/errs.go index 1509b26af4d..f1b78843fc6 100644 --- a/pkg/errs/errs.go +++ b/pkg/errs/errs.go @@ -21,6 +21,9 @@ import ( // ZapError is used to make the log output eaiser. func ZapError(err error, causeError ...error) zap.Field { + if err == nil { + return zap.Skip() + } if e, ok := err.(*errors.Error); ok { if len(causeError) >= 1 { err = e.Wrap(causeError[0]).FastGenWithCause() diff --git a/pkg/errs/errs_test.go b/pkg/errs/errs_test.go index ec04ba55492..0c8373cbc97 100644 --- a/pkg/errs/errs_test.go +++ b/pkg/errs/errs_test.go @@ -16,6 +16,7 @@ package errs import ( "bytes" "fmt" + "strconv" "strings" "testing" @@ -127,3 +128,18 @@ func (s *testErrorSuite) TestZapError(c *C) { log.Info("test", ZapError(err1)) log.Info("test", ZapError(err1, err)) } + +func (s *testErrorSuite) TestErrorWithStack(c *C) { + conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true} + lg := newZapTestLogger(conf) + log.ReplaceGlobals(lg.Logger, nil) + + _, err := strconv.ParseUint("-42", 10, 64) + log.Error("test", ZapError(ErrStrconvParseInt.Wrap(err).GenWithStackByCause())) + m1 := lg.Message() + log.Error("test", zap.Error(errors.WithStack(err))) + m2 := lg.Message() + // This test is based on line number and the first log is in line 141, the second is in line 142. + // So they have the same length stack. Move this test to another place need to change the corresponding length. + c.Assert(len(m1[strings.Index(m1, "[stack="):]), Equals, len(m2[strings.Index(m2, "[stack="):])) +} diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index 9a838f1dec1..7cca3cb1dd6 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -22,6 +22,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/pkg/types" @@ -61,7 +62,7 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co trp.CloseIdleConnections() if gerr != nil { // Do not return error, because other members may be not ready. - log.Error("failed to get cluster from remote", zap.Error(gerr)) + log.Error("failed to get cluster from remote", errs.ZapError(errs.ErrEtcdGetCluster, gerr)) continue } @@ -104,14 +105,16 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie start := time.Now() resp, err := clientv3.NewKV(c).Get(ctx, key, opts...) - if err != nil { - log.Error("load from etcd meet error", zap.Error(err)) - } if cost := time.Since(start); cost > DefaultSlowRequestTime { - log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err)) + log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), errs.ZapError(err)) } - return resp, errors.WithStack(err) + if err != nil { + e := errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() + log.Error("load from etcd meet error", zap.String("key", key), errs.ZapError(e)) + return resp, e + } + return resp, nil } // GetValue gets value with key from etcd. @@ -135,7 +138,7 @@ func get(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.G if n := len(resp.Kvs); n == 0 { return nil, nil } else if n > 1 { - return nil, errors.Errorf("invalid get value resp %v, must only one", resp.Kvs) + return nil, errs.ErrEtcdKVGetResponse.FastGenByArgs(resp.Kvs) } return resp, nil } @@ -151,7 +154,7 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op } value := resp.Kvs[0].Value if err = proto.Unmarshal(value, msg); err != nil { - return false, 0, errors.WithStack(err) + return false, 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByCause() } return true, resp.Kvs[0].ModRevision, nil } diff --git a/pkg/grpcutil/grpcutil.go b/pkg/grpcutil/grpcutil.go index 72b96bc0021..1f7455db0e3 100644 --- a/pkg/grpcutil/grpcutil.go +++ b/pkg/grpcutil/grpcutil.go @@ -18,7 +18,7 @@ import ( "crypto/tls" "net/url" - "github.com/pingcap/errors" + "github.com/tikv/pd/pkg/errs" "go.etcd.io/etcd/pkg/transport" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -55,7 +55,7 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) { tlsConfig, err := tlsInfo.ClientConfig() if err != nil { - return nil, errors.WithStack(err) + return nil, errs.ErrEtcdTLSConfig.Wrap(err).GenWithStackByCause() } return tlsConfig, nil } @@ -68,7 +68,7 @@ func (s SecurityConfig) GetOneAllowedCN() (string, error) { case 0: return "", nil default: - return "", errors.New("Currently only supports one CN") + return "", errs.ErrSecurityConfig.FastGenByArgs("only supports one CN") } } @@ -93,11 +93,11 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g } u, err := url.Parse(addr) if err != nil { - return nil, errors.WithStack(err) + return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() } cc, err := grpc.DialContext(ctx, u.Host, append(do, opt)...) if err != nil { - return nil, errors.WithStack(err) + return nil, errs.ErrGRPCDial.Wrap(err).GenWithStackByCause() } return cc, nil } diff --git a/pkg/metricutil/metricutil.go b/pkg/metricutil/metricutil.go index 64e7db47a4b..d5de5c12ca2 100644 --- a/pkg/metricutil/metricutil.go +++ b/pkg/metricutil/metricutil.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/typeutil" - "go.uber.org/zap" ) const zeroDuration = time.Duration(0) @@ -68,7 +68,7 @@ func prometheusPushClient(job, addr string, interval time.Duration) { for { err := pusher.Push() if err != nil { - log.Error("could not push metrics to Prometheus Pushgateway", zap.Error(err)) + log.Error("could not push metrics to Prometheus Pushgateway", errs.ZapError(errs.ErrPrometheusPushMetrics, err)) } time.Sleep(interval) diff --git a/pkg/mock/mockhbstream/mockhbstream.go b/pkg/mock/mockhbstream/mockhbstream.go index f234729e6bb..c3fcf15c2db 100644 --- a/pkg/mock/mockhbstream/mockhbstream.go +++ b/pkg/mock/mockhbstream/mockhbstream.go @@ -15,10 +15,10 @@ package mockhbstream import ( "context" - "errors" "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/opt" diff --git a/pkg/tempurl/check_env_linux.go b/pkg/tempurl/check_env_linux.go index 5c97db8e584..dfe00c84cf1 100644 --- a/pkg/tempurl/check_env_linux.go +++ b/pkg/tempurl/check_env_linux.go @@ -17,13 +17,13 @@ package tempurl import ( "github.com/cakturk/go-netstat/netstat" "github.com/pingcap/log" - "go.uber.org/zap" + "github.com/tikv/pd/pkg/errs" ) func environmentCheck(addr string) bool { valid, err := checkAddr(addr[len("http://"):]) if err != nil { - log.Error("check port status failed", zap.Error(err)) + log.Error("check port status failed", errs.ZapError(err)) return false } return valid @@ -34,7 +34,7 @@ func checkAddr(addr string) (bool, error) { return s.RemoteAddr.String() == addr || s.LocalAddr.String() == addr }) if err != nil { - return false, err + return false, errs.ErrNetstatTCPSocks.Wrap(err).FastGenWithCause() } return len(tabs) < 1, nil } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index ae51b1a3ab6..96e989bfcfe 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" @@ -31,7 +32,6 @@ import ( "github.com/tikv/pd/server/schedule/selector" "github.com/tikv/pd/server/schedulers" "github.com/unrolled/render" - "go.uber.org/zap" ) const ( @@ -231,7 +231,7 @@ func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato } op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader) if err != nil { - log.Debug("fail to create evict leader operator", zap.Error(err)) + log.Debug("fail to create evict leader operator", errs.ZapError(err)) continue } diff --git a/server/api/plugin.go b/server/api/plugin.go index 14d78a8d6cc..7e33810da12 100644 --- a/server/api/plugin.go +++ b/server/api/plugin.go @@ -14,11 +14,11 @@ package api import ( - "errors" "net/http" "os" "strings" + "github.com/pingcap/errors" "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/cluster" diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index b7a03d5275c..476a793176e 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -191,7 +191,7 @@ func (c *RaftCluster) HandleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb log.Warn("report split region is invalid", zap.Stringer("left-region", core.RegionToHexMeta(left)), zap.Stringer("right-region", core.RegionToHexMeta(right)), - zap.Error(err)) + errs.ZapError(err)) return nil, err } @@ -214,7 +214,7 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque if err != nil { log.Warn("report batch split region is invalid", zap.Stringer("region-meta", hrm), - zap.Error(err)) + errs.ZapError(err)) return nil, err } last := len(regions) - 1 diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 9f33225d728..caba26ea0c4 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -644,7 +644,7 @@ func (c *coordinator) runScheduler(s *scheduleController) { case <-s.Ctx().Done(): log.Info("scheduler has been stopped", zap.String("scheduler-name", s.GetName()), - zap.Error(s.Ctx().Err())) + errs.ZapError(s.Ctx().Err())) return } } diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 503efe01a0a..1a48ca3d721 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/schedule/storelimit" "go.uber.org/zap" @@ -324,7 +325,7 @@ func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { origin, err := bc.PreCheckPutRegion(region) if err != nil { - log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta())) + log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) // return the state region to delete. return []*RegionInfo{region} } diff --git a/server/handler.go b/server/handler.go index 210fa10490d..5b5e9735182 100644 --- a/server/handler.go +++ b/server/handler.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" @@ -457,7 +458,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), operator.OpAdmin) if err != nil { - log.Debug("fail to create transfer leader operator", zap.Error(err)) + log.Debug("fail to create transfer leader operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -505,7 +506,7 @@ func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64 op, err := operator.CreateMoveRegionOperator("admin-move-region", c, region, operator.OpAdmin, peers) if err != nil { - log.Debug("fail to create move region operator", zap.Error(err)) + log.Debug("fail to create move region operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -542,7 +543,7 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI newPeer := &metapb.Peer{StoreId: toStoreID, IsLearner: oldPeer.GetIsLearner()} op, err := operator.CreateMovePeerOperator("admin-move-peer", c, region, operator.OpAdmin, fromStoreID, newPeer) if err != nil { - log.Debug("fail to create move peer operator", zap.Error(err)) + log.Debug("fail to create move peer operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -588,7 +589,7 @@ func (h *Handler) AddAddPeerOperator(regionID uint64, toStoreID uint64) error { newPeer := &metapb.Peer{StoreId: toStoreID} op, err := operator.CreateAddPeerOperator("admin-add-peer", c, region, newPeer, operator.OpAdmin) if err != nil { - log.Debug("fail to create add peer operator", zap.Error(err)) + log.Debug("fail to create add peer operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -611,7 +612,7 @@ func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error op, err := operator.CreateAddPeerOperator("admin-add-learner", c, region, newPeer, operator.OpAdmin) if err != nil { - log.Debug("fail to create add learner operator", zap.Error(err)) + log.Debug("fail to create add learner operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -638,7 +639,7 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err op, err := operator.CreateRemovePeerOperator("admin-remove-peer", c, operator.OpAdmin, region, fromStoreID) if err != nil { - log.Debug("fail to create move peer operator", zap.Error(err)) + log.Debug("fail to create move peer operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(op); !ok { @@ -680,7 +681,7 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error ops, err := operator.CreateMergeRegionOperator("admin-merge-region", c, region, target, operator.OpAdmin) if err != nil { - log.Debug("fail to create merge region operator", zap.Error(err)) + log.Debug("fail to create merge region operator", errs.ZapError(err)) return err } if ok := c.GetOperatorController().AddOperator(ops...); !ok { diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index 0f3b52e91ed..a0fdb37a410 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/core" @@ -120,7 +121,7 @@ func (s *heartbeatStreams) run() { if err := stream.Send(keepAlive); err != nil { log.Error("send keepalive message fail", zap.Uint64("target-store-id", storeID), - zap.Error(err)) + errs.ZapError(err)) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "keepalive", "err").Inc() } else { diff --git a/server/kv/etcd_kv.go b/server/kv/etcd_kv.go index 08de0179ffb..7820c9ab814 100644 --- a/server/kv/etcd_kv.go +++ b/server/kv/etcd_kv.go @@ -32,10 +32,6 @@ const ( slowRequestTime = 1 * time.Second ) -var ( - errTxnFailed = errors.New("failed to commit transaction") -) - type etcdKVBase struct { client *clientv3.Client rootPath string @@ -89,11 +85,12 @@ func (kv *etcdKVBase) Save(key, value string) error { txn := NewSlowLogTxn(kv.client) resp, err := txn.Then(clientv3.OpPut(key, value)).Commit() if err != nil { - log.Error("save to etcd meet error", zap.String("key", key), zap.String("value", value), errs.ZapError(errs.ErrEtcdKVSave, err)) - return errors.WithStack(err) + e := errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause() + log.Error("save to etcd meet error", zap.String("key", key), zap.String("value", value), errs.ZapError(e)) + return e } if !resp.Succeeded { - return errors.WithStack(errTxnFailed) + return errs.ErrEtcdTxn.FastGenByArgs() } return nil } @@ -104,11 +101,12 @@ func (kv *etcdKVBase) Remove(key string) error { txn := NewSlowLogTxn(kv.client) resp, err := txn.Then(clientv3.OpDelete(key)).Commit() if err != nil { - log.Error("remove from etcd meet error", zap.String("key", key), errs.ZapError(errs.ErrEtcdKVRemove, err)) - return errors.WithStack(err) + err = errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + log.Error("remove from etcd meet error", zap.String("key", key), errs.ZapError(err)) + return err } if !resp.Succeeded { - return errors.WithStack(errTxnFailed) + return errs.ErrEtcdTxn.FastGenByArgs() } return nil } @@ -156,9 +154,9 @@ func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) { cost := time.Since(start) if cost > slowRequestTime { log.Warn("txn runs too slow", - zap.Error(err), zap.Reflect("response", resp), - zap.Duration("cost", cost)) + zap.Duration("cost", cost), + errs.ZapError(err)) } label := "success" if err != nil { diff --git a/server/member/leader.go b/server/member/leader.go index b740e9898ac..a8b4196941c 100644 --- a/server/member/leader.go +++ b/server/member/leader.go @@ -132,14 +132,14 @@ func (m *Member) GetLeaderPath() string { // CheckLeader checks returns true if it is needed to check later. func (m *Member) CheckLeader(name string) (*pdpb.Member, int64, bool) { if m.GetEtcdLeader() == 0 { - log.Error("no etcd leader, check leader later") + log.Error("no etcd leader, check pd leader later", errs.ZapError(errs.ErrEtcdLeaderNotFound)) time.Sleep(200 * time.Millisecond) return nil, 0, true } leader, rev, err := getLeader(m.client, m.GetLeaderPath()) if err != nil { - log.Error("getting pd leader meets error", errs.ZapError(errs.ErrGetLeader, err)) + log.Error("getting pd leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, 0, true } @@ -149,7 +149,7 @@ func (m *Member) CheckLeader(name string) (*pdpb.Member, int64, bool) { // in previous CampaignLeader. we can delete and campaign again. log.Warn("the leader has not changed, delete and campaign again", zap.Stringer("old-leader", leader)) if err = m.deleteLeaderKey(); err != nil { - log.Error("deleting pd leader key meets error", errs.ZapError(errs.ErrDeleteLeaderKey, err)) + log.Error("deleting pd leader key meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, 0, true } @@ -166,18 +166,18 @@ func (m *Member) CheckPriority(ctx context.Context) { } myPriority, err := m.GetMemberLeaderPriority(m.ID()) if err != nil { - log.Error("failed to load leader priority", errs.ZapError(errs.ErrLoadLeaderPriority, err)) + log.Error("failed to load leader priority", errs.ZapError(err)) return } leaderPriority, err := m.GetMemberLeaderPriority(etcdLeader) if err != nil { - log.Error("failed to load etcd leader priority", errs.ZapError(errs.ErrLoadetcdLeaderPriority, err)) + log.Error("failed to load etcd leader priority", errs.ZapError(err)) return } if myPriority > leaderPriority { err := m.MoveEtcdLeader(ctx, etcdLeader, m.ID()) if err != nil { - log.Error("failed to transfer etcd leader", errs.ZapError(errs.ErrTransferetcdLeader, err)) + log.Error("failed to transfer etcd leader", errs.ZapError(err)) } else { log.Info("transfer etcd leader", zap.Uint64("from", etcdLeader), @@ -190,7 +190,11 @@ func (m *Member) CheckPriority(ctx context.Context) { func (m *Member) MoveEtcdLeader(ctx context.Context, old, new uint64) error { moveCtx, cancel := context.WithTimeout(ctx, moveLeaderTimeout) defer cancel() - return errors.WithStack(m.etcd.Server.MoveLeader(moveCtx, old, new)) + err := m.etcd.Server.MoveLeader(moveCtx, old, new) + if err != nil { + return errs.ErrEtcdMoveLeader.Wrap(err).GenWithStackByCause() + } + return nil } // getLeader gets server leader from etcd. @@ -328,7 +332,7 @@ func (m *Member) GetMemberLeaderPriority(id uint64) (int, error) { } priority, err := strconv.ParseInt(string(res.Kvs[0].Value), 10, 32) if err != nil { - return 0, errors.WithStack(err) + return 0, errs.ErrStrconvParseInt.Wrap(err).GenWithStackByCause() } return int(priority), nil } diff --git a/server/member/lease.go b/server/member/lease.go index 2c00d7e490c..ad920455d0a 100644 --- a/server/member/lease.go +++ b/server/member/lease.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -116,7 +117,7 @@ func (l *LeaderLease) keepAliveWorker(ctx context.Context, interval time.Duratio defer cancel() res, err := l.lease.KeepAliveOnce(ctx1, l.ID) if err != nil { - log.Warn("leader lease keep alive failed", zap.Error(err)) + log.Warn("leader lease keep alive failed", errs.ZapError(err)) return } if res.TTL > 0 { diff --git a/server/region_syncer/history_buffer.go b/server/region_syncer/history_buffer.go index 0ee891b1c4e..b2984999304 100644 --- a/server/region_syncer/history_buffer.go +++ b/server/region_syncer/history_buffer.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/kv" "go.uber.org/zap" @@ -151,6 +152,6 @@ func (h *historyBuffer) persist() { regionSyncerStatus.WithLabelValues("last_index").Set(float64(h.nextIndex())) err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10)) if err != nil { - log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), zap.Error(err)) + log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), errs.ZapError(err)) } } diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index f6af5dc4c14..22b5a0f6ccf 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -23,6 +23,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/opt" @@ -217,7 +218,7 @@ func (m *ModeManager) drSwitchToAsync() error { func (m *ModeManager) drSwitchToAsyncWithLock() error { id, err := m.cluster.AllocID() if err != nil { - log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } dr := drAutoSyncStatus{State: drStateAsync, StateID: id} @@ -225,7 +226,7 @@ func (m *ModeManager) drSwitchToAsyncWithLock() error { return err } if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { - log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } m.drAutoSync = dr @@ -242,7 +243,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error { func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { id, err := m.cluster.AllocID() if err != nil { - log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: time.Now()} @@ -250,7 +251,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { return err } if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { - log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } m.drAutoSync = dr @@ -264,7 +265,7 @@ func (m *ModeManager) drSwitchToSync() error { defer m.Unlock() id, err := m.cluster.AllocID() if err != nil { - log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } dr := drAutoSyncStatus{State: drStateSync, StateID: id} @@ -272,7 +273,7 @@ func (m *ModeManager) drSwitchToSync() error { return err } if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { - log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), zap.Error(err)) + log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } m.drAutoSync = dr @@ -286,8 +287,14 @@ func (m *ModeManager) drPersistStatus(status drAutoSyncStatus) error { defer cancel() data, _ := json.Marshal(status) if err := m.fileReplicater.ReplicateFileToAllMembers(ctx, drStatusFile, data); err != nil { - log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), zap.Error(err)) - return err + log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err)) + // Throw away the error to make it possible to switch to async when + // primary and dr DC are disconnected. This will result in the + // inability to accurately determine whether data is fully + // synchronized when using dr DC to disaster recovery. + // TODO: introduce PD's leader-follower connection timeout to solve + // this issue. More details: https://github.com/tikv/pd/issues/2490 + return nil } } return nil diff --git a/server/schedule/checker/learner_checker.go b/server/schedule/checker/learner_checker.go index 0167580e8b4..2034b41d2a8 100644 --- a/server/schedule/checker/learner_checker.go +++ b/server/schedule/checker/learner_checker.go @@ -15,10 +15,10 @@ package checker import ( "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" - "go.uber.org/zap" ) // LearnerChecker ensures region has a learner will be promoted. @@ -41,7 +41,7 @@ func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator { } op, err := operator.CreatePromoteLearnerOperator("promote-learner", l.cluster, region, p) if err != nil { - log.Debug("fail to create promote learner operator", zap.Error(err)) + log.Debug("fail to create promote learner operator", errs.ZapError(err)) return nil } return op diff --git a/server/schedule/checker/merge_checker.go b/server/schedule/checker/merge_checker.go index 3202f68f1dc..9869464f93d 100644 --- a/server/schedule/checker/merge_checker.go +++ b/server/schedule/checker/merge_checker.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/codec" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" @@ -121,7 +122,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { log.Debug("try to merge region", zap.Stringer("from", core.RegionToHexMeta(region.GetMeta())), zap.Stringer("to", core.RegionToHexMeta(target.GetMeta()))) ops, err := operator.CreateMergeRegionOperator("merge-region", m.cluster, region, target, operator.OpMerge) if err != nil { - log.Warn("create merge region operator failed", zap.Error(err)) + log.Warn("create merge region operator failed", errs.ZapError(err)) return nil } checkerCounter.WithLabelValues("merge_checker", "new-operator").Inc() diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index 38aa5ad22bf..e4c8af4cc98 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -58,7 +59,7 @@ func (c *RuleChecker) Check(region *core.RegionInfo) *operator.Operator { for _, rf := range fit.RuleFits { op, err := c.fixRulePeer(region, fit, rf) if err != nil { - log.Debug("fail to fix rule peer", zap.Error(err), zap.String("rule-group", rf.Rule.GroupID), zap.String("rule-id", rf.Rule.ID)) + log.Debug("fail to fix rule peer", zap.String("rule-group", rf.Rule.GroupID), zap.String("rule-id", rf.Rule.ID), errs.ZapError(err)) break } if op != nil { @@ -67,7 +68,7 @@ func (c *RuleChecker) Check(region *core.RegionInfo) *operator.Operator { } op, err := c.fixOrphanPeers(region, fit) if err != nil { - log.Debug("fail to fix orphan peer", zap.Error(err)) + log.Debug("fail to fix orphan peer", errs.ZapError(err)) return nil } return op diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go index 8be530cb23c..0ae980d0a51 100644 --- a/server/schedule/placement/rule_manager.go +++ b/server/schedule/placement/rule_manager.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "go.uber.org/zap" ) @@ -84,22 +85,22 @@ func (m *RuleManager) loadRules() error { _, err := m.store.LoadRules(func(k, v string) { var r Rule if err := json.Unmarshal([]byte(v), &r); err != nil { - log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v)) + log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) toDelete = append(toDelete, k) return } if err := m.adjustRule(&r); err != nil { - log.Error("rule is in bad format", zap.Error(err), zap.String("rule-key", k), zap.String("rule-value", v)) + log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) toDelete = append(toDelete, k) return } if _, ok := m.rules[r.Key()]; ok { - log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v)) + log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) toDelete = append(toDelete, k) return } if k != r.StoreKey() { - log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v)) + log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) toDelete = append(toDelete, k) toSave = append(toSave, &r) } @@ -126,11 +127,11 @@ func (m *RuleManager) adjustRule(r *Rule) error { var err error r.StartKey, err = hex.DecodeString(r.StartKeyHex) if err != nil { - return errors.Wrap(err, "start key is not hex format") + return errs.ErrHexDecodingString.FastGenByArgs(r.StartKeyHex) } r.EndKey, err = hex.DecodeString(r.EndKeyHex) if err != nil { - return errors.Wrap(err, "end key is not hex format") + return errs.ErrHexDecodingString.FastGenByArgs(r.EndKeyHex) } if len(r.EndKey) > 0 && bytes.Compare(r.EndKey, r.StartKey) <= 0 { return errors.New("endKey should be greater than startKey") diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index c6ca4f253ef..904228b2fbf 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/filter" "github.com/tikv/pd/server/schedule/operator" @@ -195,7 +196,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *operator.Opera op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader) if err != nil { - log.Debug("fail to create scatter region operator", zap.Error(err)) + log.Debug("fail to create scatter region operator", errs.ZapError(err)) return nil } op.SetPriorityLevel(core.HighPriority) diff --git a/server/server.go b/server/server.go index f4689d01ea7..7bdff9a3d63 100644 --- a/server/server.go +++ b/server/server.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/pingcap/sysutil" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/pkg/logutil" @@ -568,11 +569,11 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID)) err = s.storage.SaveRegion(req.GetRegion()) if err != nil { - log.Warn("save the bootstrap region failed", zap.Error(err)) + log.Warn("save the bootstrap region failed", errs.ZapError(err)) } err = s.storage.Flush() if err != nil { - log.Warn("flush the bootstrap region failed", zap.Error(err)) + log.Warn("flush the bootstrap region failed", errs.ZapError(err)) } if err := s.cluster.Start(s); err != nil { @@ -1041,7 +1042,7 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro if cluster != nil { err := cluster.GetReplicationMode().UpdateConfig(cfg) if err != nil { - log.Warn("failed to update replication mode", zap.Error(err)) + log.Warn("failed to update replication mode", errs.ZapError(err)) // revert to old config // NOTE: since we can't put the 2 storage mutations in a batch, it // is possible that memory and persistent data become different @@ -1221,7 +1222,7 @@ func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, dat for _, member := range resp.Members { clientUrls := member.GetClientUrls() if len(clientUrls) == 0 { - log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Error(err)) + log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err)) return errors.Errorf("failed to replicate to member %s: clientUrls is empty", member.GetName()) } url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name) @@ -1229,7 +1230,7 @@ func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, dat req.Header.Set("PD-Allow-follower-handle", "true") res, err := s.httpClient.Do(req) if err != nil { - log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Error(err)) + log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err)) return errors.Errorf("failed to replicate to member %s", member.GetName()) } if res.StatusCode != http.StatusOK { diff --git a/server/tso/tso.go b/server/tso/tso.go index 3122b02a015..306b80024c1 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/pkg/typeutil" @@ -144,7 +145,7 @@ func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error { // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, // the timestamp allocation will start from the saved etcd timestamp temporarily. if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - log.Error("system time may be incorrect", zap.Time("last", last), zap.Time("next", next)) + log.Error("system time may be incorrect", zap.Time("last", last), zap.Time("next", next), errs.ZapError(errs.ErrIncorrectSystemTime)) next = last.Add(updateTimestampGuard) } diff --git a/tools/pd-analysis/analysis/parse_log.go b/tools/pd-analysis/analysis/parse_log.go index 6676c864492..45b9c96a306 100644 --- a/tools/pd-analysis/analysis/parse_log.go +++ b/tools/pd-analysis/analysis/parse_log.go @@ -15,13 +15,14 @@ package analysis import ( "bufio" - "errors" "io" "log" "os" "regexp" "strconv" "time" + + "github.com/pingcap/errors" ) var supportOperators = []string{"balance-region", "balance-leader", "transfer-hot-read-leader", "move-hot-read-region", "transfer-hot-write-leader", "move-hot-write-region"}