Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move metrics shortcuts to /metrics #22693

Merged
merged 4 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,11 @@ type twoPhaseCommitAction interface {
String() string
}

var (
tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")
tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok")
tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err")
tikvAsyncCommitTxnCounterOk = metrics.TiKVAsyncCommitTxnCounter.WithLabelValues("ok")
tikvAsyncCommitTxnCounterError = metrics.TiKVAsyncCommitTxnCounter.WithLabelValues("err")
tikvOnePCTxnCounterOk = metrics.TiKVOnePCTxnCounter.WithLabelValues("ok")
tikvOnePCTxnCounterError = metrics.TiKVOnePCTxnCounter.WithLabelValues("err")
)

// Global variable set by config file.
var (
ManagedLockTTL uint64 = 20000 // 20s
)

// metricsTag returns detail tag for metrics.
func metricsTag(action string) string {
return "2pc_" + action
}

// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *KVStore
Expand Down Expand Up @@ -766,7 +751,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
zap.Uint64("session", c.sessionID),
zap.Stringer("action type", action),
zap.Error(e))
tikvSecondaryLockCleanupFailureCounterCommit.Inc()
metrics.SecondaryLockCleanupFailureCounterCommit.Inc()
}
}()
} else {
Expand Down Expand Up @@ -907,13 +892,13 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
if err != nil {
tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.Logger(bo.ctx).Warn("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
}
tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}
Expand Down Expand Up @@ -1046,7 +1031,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
cleanupKeysCtx := context.WithValue(context.Background(), TxnStartKey, ctx.Value(TxnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
if err != nil {
tikvSecondaryLockCleanupFailureCounterRollback.Inc()
metrics.SecondaryLockCleanupFailureCounterRollback.Inc()
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
Expand All @@ -1065,19 +1050,19 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.isOnePC() {
// The error means the 1PC transaction failed.
if err != nil {
tikvOnePCTxnCounterError.Inc()
metrics.OnePCTxnCounterError.Inc()
} else {
tikvOnePCTxnCounterOk.Inc()
metrics.OnePCTxnCounterOk.Inc()
}
} else if c.isAsyncCommit() {
// The error means the async commit should not succeed.
if err != nil {
if c.getUndeterminedErr() == nil {
c.cleanup(ctx)
}
tikvAsyncCommitTxnCounterError.Inc()
metrics.AsyncCommitTxnCounterError.Inc()
} else {
tikvAsyncCommitTxnCounterOk.Inc()
metrics.AsyncCommitTxnCounterOk.Inc()
}
} else {
// Always clean up all written keys if the txn does not commit.
Expand Down
27 changes: 8 additions & 19 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,25 @@ const (
DecorrJitter
)

var (
tikvBackoffHistogramRPC = metrics.TiKVBackoffHistogram.WithLabelValues("tikvRPC")
tikvBackoffHistogramLock = metrics.TiKVBackoffHistogram.WithLabelValues("txnLock")
tikvBackoffHistogramLockFast = metrics.TiKVBackoffHistogram.WithLabelValues("tikvLockFast")
tikvBackoffHistogramPD = metrics.TiKVBackoffHistogram.WithLabelValues("pdRPC")
tikvBackoffHistogramRegionMiss = metrics.TiKVBackoffHistogram.WithLabelValues("regionMiss")
tikvBackoffHistogramServerBusy = metrics.TiKVBackoffHistogram.WithLabelValues("serverBusy")
tikvBackoffHistogramStaleCmd = metrics.TiKVBackoffHistogram.WithLabelValues("staleCommand")
tikvBackoffHistogramEmpty = metrics.TiKVBackoffHistogram.WithLabelValues("")
)

func (t BackoffType) metric() prometheus.Observer {
switch t {
// TODO: distinguish tikv and tiflash in metrics
case BoTiKVRPC, BoTiFlashRPC:
return tikvBackoffHistogramRPC
return metrics.BackoffHistogramRPC
case BoTxnLock:
return tikvBackoffHistogramLock
return metrics.BackoffHistogramLock
case BoTxnLockFast:
return tikvBackoffHistogramLockFast
return metrics.BackoffHistogramLockFast
case BoPDRPC:
return tikvBackoffHistogramPD
return metrics.BackoffHistogramPD
case BoRegionMiss:
return tikvBackoffHistogramRegionMiss
return metrics.BackoffHistogramRegionMiss
case boTiKVServerBusy, boTiFlashServerBusy:
return tikvBackoffHistogramServerBusy
return metrics.BackoffHistogramServerBusy
case boStaleCmd:
return tikvBackoffHistogramStaleCmd
return metrics.BackoffHistogramStaleCmd
}
return tikvBackoffHistogramEmpty
return metrics.BackoffHistogramEmpty
}

// NewBackoffFn creates a backoff func which implements exponential backoff with
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand Down Expand Up @@ -160,7 +161,7 @@ func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, st
zap.Int("range len", rangesLen),
zap.Int("task len", len(batchTasks)))
}
tikvTxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks)))
metrics.TxnRegionsNumHistogramWithBatchCoprocessor.Observe(float64(len(batchTasks)))
return batchTasks, nil
}
}
Expand Down
3 changes: 1 addition & 2 deletions store/tikv/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
type actionCleanup struct{}

var _ twoPhaseCommitAction = actionCleanup{}
var tiKVTxnRegionsNumHistogramCleanup = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag("cleanup"))

func (actionCleanup) String() string {
return "cleanup"
}

func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return tiKVTxnRegionsNumHistogramCleanup
return metrics.TxnRegionsNumHistogramCleanup
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
Expand Down
5 changes: 1 addition & 4 deletions store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ type actionCommit struct{ retry bool }

var _ twoPhaseCommitAction = actionCommit{}

var tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
var tiKVTxnRegionsNumHistogramCommit = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag("commit"))

func (actionCommit) String() string {
return "commit"
}

func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return tiKVTxnRegionsNumHistogramCommit
return metrics.TxnRegionsNumHistogramCommit
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
Expand Down
8 changes: 2 additions & 6 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ import (
"go.uber.org/zap"
)

var (
tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
tikvTxnRegionsNumHistogramWithBatchCoprocessor = metrics.TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
coprCacheHistogramEvict = tidbmetrics.DistSQLCoprCacheHistogram.WithLabelValues("evict")
)
var coprCacheHistogramEvict = tidbmetrics.DistSQLCoprCacheHistogram.WithLabelValues("evict")

// CopClient is coprocessor client.
type CopClient struct {
Expand Down Expand Up @@ -180,7 +176,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
zap.Int("range len", rangesLen),
zap.Int("task len", len(tasks)))
}
tikvTxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(len(tasks)))
return tasks, nil
}

Expand Down
49 changes: 16 additions & 33 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,6 @@ const ResolvedCacheSize = 2048
// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`.
const bigTxnThreshold = 16

var (
tikvLockResolverCountWithBatchResolve = metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve")
tikvLockResolverCountWithExpired = metrics.TiKVLockResolverCounter.WithLabelValues("expired")
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithResolveForWrite = metrics.TiKVLockResolverCounter.WithLabelValues("resolve_for_write")
tikvLockResolverCountWithResolveAsync = metrics.TiKVLockResolverCounter.WithLabelValues("resolve_async_commit")
tikvLockResolverCountWithWriteConflict = metrics.TiKVLockResolverCounter.WithLabelValues("write_conflict")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
tikvLockResolverCountWithQueryTxnStatusRolledBack = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_rolled_back")
tikvLockResolverCountWithQueryCheckSecondaryLocks = metrics.TiKVLockResolverCounter.WithLabelValues("query_check_secondary_locks")
tikvLockResolverCountWithResolveLocks = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_locks")
tikvLockResolverCountWithResolveLockLite = metrics.TiKVLockResolverCounter.WithLabelValues("query_resolve_lock_lite")
)

// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store Storage
Expand Down Expand Up @@ -237,7 +220,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
return true, nil
}

tikvLockResolverCountWithBatchResolve.Inc()
metrics.LockResolverCountWithBatchResolve.Inc()

// The GCWorker kill all ongoing transactions, because it must make sure all
// locks have been cleaned before GC.
Expand All @@ -254,7 +237,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
tikvLockResolverCountWithExpired.Inc()
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
Expand Down Expand Up @@ -361,9 +344,9 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

if forWrite {
tikvLockResolverCountWithResolveForWrite.Inc()
metrics.LockResolverCountWithResolveForWrite.Inc()
} else {
tikvLockResolverCountWithResolve.Inc()
metrics.LockResolverCountWithResolve.Inc()
}

var pushFail bool
Expand All @@ -384,7 +367,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

if status.ttl == 0 {
tikvLockResolverCountWithExpired.Inc()
metrics.LockResolverCountWithExpired.Inc()
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
Expand All @@ -406,7 +389,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
return err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
metrics.LockResolverCountWithNotExpired.Inc()
// If the lock is valid, the txn may be a pessimistic transaction.
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
Expand All @@ -417,7 +400,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
// abort current transaction.
// This could avoids the deadlock scene of two large transaction.
if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS {
tikvLockResolverCountWithWriteConflict.Inc()
metrics.LockResolverCountWithWriteConflict.Inc()
return kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
}
} else {
Expand Down Expand Up @@ -446,7 +429,7 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks

if msBeforeTxnExpired.value() > 0 && len(pushed) == 0 {
// If len(pushed) > 0, the caller will not block on the locks, it push the minCommitTS instead.
tikvLockResolverCountWithWaitExpired.Inc()
metrics.LockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), pushed, nil
}
Expand Down Expand Up @@ -582,7 +565,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
return s, nil
}

tikvLockResolverCountWithQueryTxnStatus.Inc()
metrics.LockResolverCountWithQueryTxnStatus.Inc()

// CheckTxnStatus may meet the following cases:
// 1. LOCK
Expand Down Expand Up @@ -649,9 +632,9 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
status.ttl = cmdResp.LockTtl
} else {
if cmdResp.CommitVersion == 0 {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
metrics.LockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
metrics.LockResolverCountWithQueryTxnStatusCommitted.Inc()
}

status.commitTS = cmdResp.CommitVersion
Expand Down Expand Up @@ -744,7 +727,7 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys []
StartVersion: txnID,
}
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq)
tikvLockResolverCountWithQueryCheckSecondaryLocks.Inc()
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
resp, err := lr.store.SendReq(bo, req, curRegionID, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -785,7 +768,7 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys []

// resolveLockAsync resolves l assuming it was locked using the async commit protocol.
func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatus) error {
tikvLockResolverCountWithResolveAsync.Inc()
metrics.LockResolverCountWithResolveAsync.Inc()

resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
Expand Down Expand Up @@ -918,7 +901,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region Region
}

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[RegionVerID]struct{}) error {
tikvLockResolverCountWithResolveLocks.Inc()
metrics.LockResolverCountWithResolveLocks.Inc()
resolveLite := lite || l.TxnSize < bigTxnThreshold
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
Expand All @@ -940,7 +923,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li
if resolveLite {
// Only resolve specified keys when it is a small transaction,
// prevent from scanning the whole region in this case.
tikvLockResolverCountWithResolveLockLite.Inc()
metrics.LockResolverCountWithResolveLockLite.Inc()
lreq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
Expand Down Expand Up @@ -976,7 +959,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li
}

func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegions map[RegionVerID]struct{}) error {
tikvLockResolverCountWithResolveLocks.Inc()
metrics.LockResolverCountWithResolveLocks.Inc()
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ func initMetrics(namespace, subsystem string) {
Name: "one_pc_txn_counter",
Help: "Counter of 1PC transactions.",
}, []string{LblType})

initShortcuts()
}

func init() {
Expand Down
Loading