From f413183a1e90d21f2280b0eec3da006cfc68b6ac Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 26 Aug 2019 15:35:42 +0800 Subject: [PATCH] tikv: refine commit backoff slow log (#11757) --- store/tikv/2pc.go | 57 +++++++++++++++++++++------- store/tikv/backoff.go | 7 ++-- store/tikv/txn.go | 9 +++-- util/execdetails/execdetails.go | 26 ++++++++++--- util/execdetails/execdetails_test.go | 18 ++++++++- 5 files changed, 89 insertions(+), 28 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9ed406862641b..512449a04e124 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -21,6 +21,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -95,7 +96,7 @@ type twoPhaseCommitter struct { // We use it to guarantee GC worker will not influence any active txn. The value // should be less than GC life time. maxTxnTimeUse uint64 - detail *execdetails.CommitDetails + detail unsafe.Pointer primaryKey []byte forUpdateTS uint64 pessimisticTTL uint64 @@ -273,7 +274,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = getTxnPriority(txn) c.syncLog = getTxnSyncLog(txn) - c.detail = commitDetail + c.setDetail(commitDetail) return nil } @@ -334,7 +335,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } } sizeFunc = c.keyValueSize - atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) + atomic.AddInt32(&c.getDetail().PrewriteRegionNum, int32(len(groups))) } // Make sure the group that contains primary key goes first. batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize) @@ -418,6 +419,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm batch := batch1 go func() { + var singleBatchBackoffer *Backoffer if action == actionCommit { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not @@ -426,12 +428,22 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm // Here we makes a new clone of the original backoffer for this goroutine // exclusively to avoid the data race when using the same backoffer // in concurrent goroutines. - singleBatchBackoffer := backoffer.Clone() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + singleBatchBackoffer = backoffer.Clone() } else { - singleBatchBackoffer, singleBatchCancel := backoffer.Fork() + var singleBatchCancel context.CancelFunc + singleBatchBackoffer, singleBatchCancel = backoffer.Fork() defer singleBatchCancel() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + } + beforeSleep := singleBatchBackoffer.totalSleep + ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + commitDetail := c.getDetail() + if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil + if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond)) + commitDetail.Mu.Lock() + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.types...) + commitDetail.Mu.Unlock() + } } }() } @@ -566,7 +578,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) if err != nil { return errors.Trace(err) } - atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start))) + atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start))) if msBeforeExpired > 0 { err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { @@ -714,6 +726,14 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri { return pb.CommandPri_Normal } +func (c *twoPhaseCommitter) setDetail(d *execdetails.CommitDetails) { + atomic.StorePointer(&c.detail, unsafe.Pointer(d)) +} + +func (c *twoPhaseCommitter) getDetail() *execdetails.CommitDetails { + return (*execdetails.CommitDetails)(atomic.LoadPointer(&c.detail)) +} + func (c *twoPhaseCommitter) setUndeterminedErr(err error) { c.mu.Lock() defer c.mu.Unlock() @@ -906,8 +926,14 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars) start := time.Now() err := c.prewriteKeys(prewriteBo, c.keys) - c.detail.PrewriteTime = time.Since(start) - c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond + commitDetail := c.getDetail() + commitDetail.PrewriteTime = time.Since(start) + if prewriteBo.totalSleep > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(prewriteBo.totalSleep)*int64(time.Millisecond)) + commitDetail.Mu.Lock() + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, prewriteBo.types...) + commitDetail.Mu.Unlock() + } if binlogChan != nil { binlogErr := <-binlogChan if binlogErr != nil { @@ -929,7 +955,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { zap.Uint64("txnStartTS", c.startTS)) return errors.Trace(err) } - c.detail.GetCommitTsTime = time.Since(start) + commitDetail.GetCommitTsTime = time.Since(start) // check commitTS if commitTS <= c.startTS { @@ -958,8 +984,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { start = time.Now() commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) err = c.commitKeys(commitBo, c.keys) - c.detail.CommitTime = time.Since(start) - c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond + commitDetail.CommitTime = time.Since(start) + if commitBo.totalSleep > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond)) + commitDetail.Mu.Lock() + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, commitBo.types...) + commitDetail.Mu.Unlock() + } if err != nil { if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { logutil.Logger(ctx).Error("2PC commit result undetermined", diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 3dd36d219230b..ed38e81cf9594 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -237,7 +237,7 @@ type Backoffer struct { maxSleep int totalSleep int errors []error - types []backoffType + types []fmt.Stringer vars *kv.Variables noop bool } @@ -290,6 +290,7 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err default: } + b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) b.types = append(b.types, typ) if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) { errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", typ.String(), b.maxSleep) @@ -301,7 +302,7 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err } logutil.Logger(context.Background()).Warn(errMsg) // Use the first backoff type to generate a MySQL error. - return b.types[0].TError() + return b.types[0].(backoffType).TError() } backoffCounter, backoffDuration := typ.metric() @@ -330,8 +331,6 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", typ), zap.Reflect("txnStartTS", startTs)) - - b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) return nil } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 7d81d22624003..7d671c0be6495 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -287,7 +287,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if *commitDetail != nil { (*commitDetail).TxnRetry += 1 } else { - *commitDetail = committer.detail + *commitDetail = committer.getDetail() } } }() @@ -303,9 +303,10 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // for transactions which need to acquire latches start = time.Now() lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys) - committer.detail.LocalLatchTime = time.Since(start) - if committer.detail.LocalLatchTime > 0 { - metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds()) + commitDetail := committer.getDetail() + commitDetail.LocalLatchTime = time.Since(start) + if commitDetail.LocalLatchTime > 0 { + metrics.TiKVLocalLatchWaitTimeHistogram.Observe(commitDetail.LocalLatchTime.Seconds()) } defer txn.store.txnLatches.UnLock(lock) if lock.IsStale() { diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 73ea8d9dd20cf..84251bf2f4af6 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -47,7 +47,11 @@ type CommitDetails struct { PrewriteTime time.Duration CommitTime time.Duration LocalLatchTime time.Duration - TotalBackoffTime time.Duration + CommitBackoffTime int64 + Mu struct { + sync.Mutex + BackoffTypes []fmt.Stringer + } ResolveLockTime int64 WriteKeys int WriteSize int @@ -102,9 +106,15 @@ func (d ExecDetails) String() string { if commitDetails.GetCommitTsTime > 0 { parts = append(parts, fmt.Sprintf("Get_commit_ts_time: %v", commitDetails.GetCommitTsTime.Seconds())) } - if commitDetails.TotalBackoffTime > 0 { - parts = append(parts, fmt.Sprintf("Total_backoff_time: %v", commitDetails.TotalBackoffTime.Seconds())) + commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime) + if commitBackoffTime > 0 { + parts = append(parts, fmt.Sprintf("Commit_backoff_time: %v", time.Duration(commitBackoffTime).Seconds())) + } + commitDetails.Mu.Lock() + if len(commitDetails.Mu.BackoffTypes) > 0 { + parts = append(parts, fmt.Sprintf("Backoff_types: %v", commitDetails.Mu.BackoffTypes)) } + commitDetails.Mu.Unlock() resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime) if resolveLockTime > 0 { parts = append(parts, fmt.Sprintf("Resolve_lock_time: %v", time.Duration(resolveLockTime).Seconds())) @@ -161,9 +171,15 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) { if commitDetails.GetCommitTsTime > 0 { fields = append(fields, zap.String("get_commit_ts_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.GetCommitTsTime.Seconds(), 'f', -1, 64)+"s"))) } - if commitDetails.TotalBackoffTime > 0 { - fields = append(fields, zap.String("total_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.TotalBackoffTime.Seconds(), 'f', -1, 64)+"s"))) + commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime) + if commitBackoffTime > 0 { + fields = append(fields, zap.String("commit_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(commitBackoffTime).Seconds(), 'f', -1, 64)+"s"))) + } + commitDetails.Mu.Lock() + if len(commitDetails.Mu.BackoffTypes) > 0 { + fields = append(fields, zap.String("backoff_types", fmt.Sprintf("%v", commitDetails.Mu.BackoffTypes))) } + commitDetails.Mu.Unlock() resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime) if resolveLockTime > 0 { fields = append(fields, zap.String("resolve_lock_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(resolveLockTime).Seconds(), 'f', -1, 64)+"s"))) diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index de39f456ce09b..7aee493aee457 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -14,9 +14,12 @@ package execdetails import ( + "fmt" + "sync" "testing" "time" + "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -33,7 +36,18 @@ func TestString(t *testing.T) { PrewriteTime: time.Second, CommitTime: time.Second, LocalLatchTime: time.Second, - TotalBackoffTime: time.Second, + CommitBackoffTime: int64(time.Second), + Mu: struct { + sync.Mutex + BackoffTypes []fmt.Stringer + }{BackoffTypes: []fmt.Stringer{ + stringutil.MemoizeStr(func() string { + return "backoff1" + }), + stringutil.MemoizeStr(func() string { + return "backoff2" + }), + }}, ResolveLockTime: 1000000000, // 10^9 ns = 1s WriteKeys: 1, WriteSize: 1, @@ -42,7 +56,7 @@ func TestString(t *testing.T) { }, } expected := "Process_time: 2.005 Wait_time: 1 Backoff_time: 1 Request_count: 1 Total_keys: 100 Process_keys: 10 Prewrite_time: 1 Commit_time: 1 " + - "Get_commit_ts_time: 1 Total_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1" + "Get_commit_ts_time: 1 Commit_backoff_time: 1 Backoff_types: [backoff1 backoff2] Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1" if str := detail.String(); str != expected { t.Errorf("got:\n%s\nexpected:\n%s", str, expected) }