Skip to content

Commit

Permalink
cherry pick #2896 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Sep 7, 2020
1 parent bb86c69 commit b4c634c
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 37 deletions.
9 changes: 5 additions & 4 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/metricutil"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -53,7 +54,7 @@ func main() {
case flag.ErrHelp:
exit(0)
default:
log.Fatal("parse cmd flags error", zap.Error(err))
log.Fatal("parse cmd flags error", errs.ZapError(errs.ErrParseFlags))
}

if cfg.ConfigCheck {
Expand All @@ -66,15 +67,15 @@ func main() {
if err == nil {
log.ReplaceGlobals(cfg.GetZapLogger(), cfg.GetZapLogProperties())
} else {
log.Fatal("initialize logger error", zap.Error(err))
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()

// The old logger
err = logutil.InitLogger(&cfg.Log)
if err != nil {
log.Fatal("initialize logger error", zap.Error(err))
log.Fatal("initialize logger error", errs.ZapError(err))
}

server.LogPDInfo()
Expand All @@ -99,7 +100,7 @@ func main() {
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", zap.Error(err))
log.Fatal("create server failed", errs.ZapError(err))
}

sc := make(chan os.Signal, 1)
Expand Down
110 changes: 98 additions & 12 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,25 @@ package errs

import "github.com/pingcap/errors"

// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:common:ErrIncorrectSystemTime"))
)

// The internal error which is generated in PD project.
// main errors
var (
ErrParseFlags = errors.Normalize("parse flags error", errors.RFCCodeText("PD:main:ErrParseFlags"))
)

// tso errors
var (
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:tso:ErrIncorrectSystemTime"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
)

// member errors
Expand All @@ -29,20 +42,34 @@ var (
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
)

// core errors
var (
ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys"))
ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound"))
ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer"))
ErrStoreTombstone = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreTombstone"))
)

// client errors
var (
ErrWatcherCancel = errors.Normalize("watcher canceled", errors.RFCCodeText("PD:member:ErrWatcherCancel"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
)

// schedule errors
var (
ErrUnexpectedOperatorStatus = errors.Normalize("operator with unexpected status", errors.RFCCodeText("PD:schedule:ErrUnexpectedOperatorStatus"))
ErrUnknownOperatorStep = errors.Normalize("unknown operator step found", errors.RFCCodeText("PD:schedule:ErrUnknownOperatorStep"))
ErrMergeOperator = errors.Normalize("merge operator error, %s", errors.RFCCodeText("PD:schedule:ErrMergeOperator"))
)

// scheduler errors
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore"))
ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted"))
ErrSchedulerDuplicated = errors.Normalize("scheduler duplicated", errors.RFCCodeText("PD:scheduler:ErrSchedulerDuplicated"))
ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound"))
ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist"))
ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig"))
Expand All @@ -61,15 +88,40 @@ var (

// cluster errors
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrFeatureNotExisted = errors.Normalize("feature not existed", errors.RFCCodeText("PD:cluster:ErrFeatureNotExisted"))
)

// apiutil errors
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
)

// grpcutil errors
var (
ErrSecurityConfig = errors.Normalize("security config error: %s", errors.RFCCodeText("PD:grpcutil:ErrSecurityConfig"))
)

// server errors
var (
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
)

// logutil errors
var (
ErrInitFileLog = errors.Normalize("init file log error, %s", errors.RFCCodeText("PD:logutil:ErrInitFileLog"))
)

// typeutil errors
var (
ErrBytesToUint64 = errors.Normalize("invalid data, must 8 bytes, but %d", errors.RFCCodeText("PD:typeutil:ErrBytesToUint64"))
)

// The third-party project error.
// url errors
var (
Expand All @@ -79,8 +131,12 @@ var (

// grpc errors
var (
ErrGRPCDial = errors.Normalize("dial error", errors.RFCCodeText("PD:grpc:ErrGRPCDial"))
ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:grpc:ErrCloseGRPCConn"))
ErrGRPCDial = errors.Normalize("dial error", errors.RFCCodeText("PD:grpc:ErrGRPCDial"))
ErrCloseGRPCConn = errors.Normalize("close gRPC connection failed", errors.RFCCodeText("PD:grpc:ErrCloseGRPCConn"))
ErrGRPCSend = errors.Normalize("send request error", errors.RFCCodeText("PD:grpc:ErrGRPCSend"))
ErrGRPCRecv = errors.Normalize("receive response error", errors.RFCCodeText("PD:grpc:ErrGRPCRecv"))
ErrGRPCCloseSend = errors.Normalize("close send error", errors.RFCCodeText("PD:grpc:ErrGRPCCloseSend"))
ErrGRPCCreateStream = errors.Normalize("create stream error", errors.RFCCodeText("PD:grpc:ErrGRPCCreateStream"))
)

// proto errors
Expand Down Expand Up @@ -116,13 +172,16 @@ var (

// strconv errors
var (
ErrStrconvParseInt = errors.Normalize("parse int error", errors.RFCCodeText("PD:strconv:ErrStrconvParseInt"))
ErrStrconvParseUint = errors.Normalize("parse uint error", errors.RFCCodeText("PD:strconv:ErrStrconvParseUint"))
ErrStrconvParseInt = errors.Normalize("parse int error", errors.RFCCodeText("PD:strconv:ErrStrconvParseInt"))
ErrStrconvParseUint = errors.Normalize("parse uint error", errors.RFCCodeText("PD:strconv:ErrStrconvParseUint"))
ErrStrconvParseFloat = errors.Normalize("parse float error", errors.RFCCodeText("PD:strconv:ErrStrconvParseFloat"))
)

// prometheus errors
var (
ErrPrometheusPushMetrics = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:prometheus:ErrPrometheusPushMetrics"))
ErrPrometheusPushMetrics = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:prometheus:ErrPrometheusPushMetrics"))
ErrPrometheusCreateClient = errors.Normalize("create client error", errors.RFCCodeText("PD:prometheus:ErrPrometheusCreateClient"))
ErrPrometheusQuery = errors.Normalize("query error", errors.RFCCodeText("PD:prometheus:ErrPrometheusQuery"))
)

// http errors
Expand All @@ -137,6 +196,16 @@ var (
ErrIORead = errors.Normalize("IO read error", errors.RFCCodeText("PD:ioutil:ErrIORead"))
)

// os error
var (
ErrOSOpen = errors.Normalize("open error", errors.RFCCodeText("PD:os:ErrOSOpen"))
)

// dir error
var (
ErrReadDirName = errors.Normalize("read dir name error", errors.RFCCodeText("PD:dir:ErrReadDirName"))
)

// netstat error
var (
ErrNetstatTCPSocks = errors.Normalize("TCP socks error", errors.RFCCodeText("PD:netstat:ErrNetstatTCPSocks"))
Expand All @@ -163,3 +232,20 @@ var (
ErrJSONMarshal = errors.Normalize("failed to marshal json", errors.RFCCodeText("PD:json:ErrJSONMarshal"))
ErrJSONUnmarshal = errors.Normalize("failed to unmarshal json", errors.RFCCodeText("PD:json:ErrJSONUnmarshal"))
)

// leveldb errors
var (
ErrLevelDBClose = errors.Normalize("close leveldb error", errors.RFCCodeText("PD:leveldb:ErrLevelDBClose"))
ErrLevelDBWrite = errors.Normalize("leveldb write error", errors.RFCCodeText("PD:leveldb:ErrLevelDBWrite"))
ErrLevelDBOpen = errors.Normalize("leveldb open file error", errors.RFCCodeText("PD:leveldb:ErrLevelDBOpen"))
)

// semver
var (
ErrSemverNewVersion = errors.Normalize("new version error", errors.RFCCodeText("PD:semver:ErrSemverNewVersion"))
)

// log
var (
ErrInitLogger = errors.Normalize("init logger error", errors.RFCCodeText("PD:log:ErrInitLogger"))
)
4 changes: 2 additions & 2 deletions pkg/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"

"github.com/coreos/pkg/capnslog"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
log "github.com/sirupsen/logrus"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/raft"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -217,7 +217,7 @@ func StringToLogFormatter(format string, disableTimestamp bool) log.Formatter {
func InitFileLog(cfg *zaplog.FileLogConfig) error {
if st, err := os.Stat(cfg.Filename); err == nil {
if st.IsDir() {
return errors.New("can't use directory as log file name")
return errs.ErrInitFileLog.FastGenByArgs("can't use directory as log file name")
}
}
if cfg.MaxSize == 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/typeutil/convension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package typeutil
import (
"encoding/binary"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/errs"
)

// BytesToUint64 converts a byte slice to uint64.
func BytesToUint64(b []byte) (uint64, error) {
if len(b) != 8 {
return 0, errors.Errorf("invalid data, must 8 bytes, but %d", len(b))
return 0, errs.ErrBytesToUint64.FastGenByArgs(len(b))
}

return binary.BigEndian.Uint64(b), nil
Expand Down
11 changes: 7 additions & 4 deletions server/cluster/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ package cluster

import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -50,7 +50,7 @@ var featuresDict = map[Feature]string{
func MinSupportedVersion(v Feature) *semver.Version {
target, ok := featuresDict[v]
if !ok {
log.Fatal("the corresponding version of the feature doesn't exist", zap.Int("feature-number", int(v)))
log.Fatal("the corresponding version of the feature doesn't exist", zap.Int("feature-number", int(v)), errs.ZapError(errs.ErrFeatureNotExisted))
}
version := MustParseVersion(target)
return version
Expand All @@ -66,14 +66,17 @@ func ParseVersion(v string) (*semver.Version, error) {
v = v[1:]
}
ver, err := semver.NewVersion(v)
return ver, errors.WithStack(err)
if err != nil {
return nil, errs.ErrSemverNewVersion.Wrap(err).GenWithStackByCause()
}
return ver, nil
}

// MustParseVersion wraps ParseVersion and will panic if error is not nil.
func MustParseVersion(v string) *semver.Version {
ver, err := ParseVersion(v)
if err != nil {
log.Fatal("version string is illegal", zap.Error(err))
log.Fatal("version string is illegal", errs.ZapError(err))
}
return ver
}
Expand Down
5 changes: 3 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/metricutil"
"github.com/tikv/pd/pkg/typeutil"
Expand Down Expand Up @@ -1088,7 +1089,7 @@ func ParseUrls(s string) ([]url.URL, error) {
for _, item := range items {
u, err := url.Parse(item)
if err != nil {
return nil, errors.WithStack(err)
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

urls = append(urls, *u)
Expand All @@ -1101,7 +1102,7 @@ func ParseUrls(s string) ([]url.URL, error) {
func (c *Config) SetupLogger() error {
lg, p, err := log.InitLogger(&c.Log, zap.AddStacktrace(zapcore.FatalLevel))
if err != nil {
return err
return errs.ErrInitLogger.Wrap(err).FastGenWithCause()
}
c.logger = lg
c.logProps = p
Expand Down
5 changes: 3 additions & 2 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/schedule/storelimit"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -579,8 +580,8 @@ func (s *StoresInfo) BlockStore(storeID uint64) errcode.ErrorCode {
func (s *StoresInfo) UnblockStore(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is unblocked, but it is not found",
zap.Uint64("store-id", storeID))
log.Fatal("try to clean a store's pause state, but it is not found",
zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrStoreNotFound.FastGenByArgs(storeID)))
}
s.stores[storeID] = store.Clone(SetStoreUnBlock())
}
Expand Down
3 changes: 2 additions & 1 deletion server/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -93,7 +94,7 @@ func PrepareJoinCluster(cfg *config.Config) error {
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
s, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal("read the join config meet error", zap.Error(err))
log.Fatal("read the join config meet error", errs.ZapError(errs.ErrIORead, err))
}
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down
3 changes: 2 additions & 1 deletion server/kv/levedb_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/tikv/pd/pkg/errs"
)

// LeveldbKV is a kv store using leveldb.
Expand All @@ -30,7 +31,7 @@ type LeveldbKV struct {
func NewLeveldbKV(path string) (*LeveldbKV, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, errors.WithStack(err)
return nil, errs.ErrLevelDBOpen.Wrap(err).GenWithStackByCause()
}
return &LeveldbKV{db}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/member/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (m *Member) WatchLeader(serverCtx context.Context, leader *pdpb.Member, rev
break
}
if wresp.Canceled {
log.Error("pd leader watcher is canceled with", zap.Int64("revision", revision), errs.ZapError(errs.ErrWatcherCancel, wresp.Err()))
log.Error("pd leader watcher is canceled with", zap.Int64("revision", revision), errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return
}

Expand Down
2 changes: 1 addition & 1 deletion server/region_syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (h *historyBuffer) reload() {
if v != "" {
h.index, err = strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatal("load history index failed", zap.Error(err))
log.Fatal("load history index failed", errs.ZapError(errs.ErrStrconvParseUint, err))
}
}
log.Info("start from history index", zap.Uint64("start-index", h.firstIndex()))
Expand Down
Loading

0 comments on commit b4c634c

Please sign in to comment.