diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9a82cb6d0..2479b296c 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -988,12 +988,23 @@ func (s *RegionRequestSender) SendReqCtx( metrics.TiKVRequestRetryTimesHistogram.Observe(float64(tryTimes)) } }() + + var staleReadCollector *staleReadMetricsCollector + if req.StaleRead { + staleReadCollector = &staleReadMetricsCollector{hit: true} + staleReadCollector.onReq(req) + defer staleReadCollector.collect() + } + for { if tryTimes > 0 { req.IsRetryRequest = true if tryTimes%100 == 0 { logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes)) } + if req.StaleRead && staleReadCollector != nil { + staleReadCollector.hit = false + } } rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...) @@ -1071,6 +1082,9 @@ func (s *RegionRequestSender) SendReqCtx( s.replicaSelector.onSendSuccess() } } + if staleReadCollector != nil { + staleReadCollector.onResp(resp) + } return resp, rpcCtx, nil } } @@ -1642,3 +1656,59 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // Because caller may need to re-split the request. return false, nil } + +type staleReadMetricsCollector struct { + tp tikvrpc.CmdType + hit bool + out int + in int +} + +func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) { + size := 0 + switch req.Type { + case tikvrpc.CmdGet: + size += req.Get().Size() + case tikvrpc.CmdBatchGet: + size += req.BatchGet().Size() + case tikvrpc.CmdScan: + size += req.Scan().Size() + case tikvrpc.CmdCop: + size += req.Cop().Size() + default: + // ignore non-read requests + return + } + s.tp = req.Type + size += req.Context.Size() + s.out = size +} + +func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) { + size := 0 + switch s.tp { + case tikvrpc.CmdGet: + size += resp.Resp.(*kvrpcpb.GetResponse).Size() + case tikvrpc.CmdBatchGet: + size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size() + case tikvrpc.CmdScan: + size += resp.Resp.(*kvrpcpb.ScanResponse).Size() + case tikvrpc.CmdCop: + size += resp.Resp.(*coprocessor.Response).Size() + default: + // unreachable + return + } + s.in = size +} + +func (s *staleReadMetricsCollector) collect() { + in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic + if !s.hit { + in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic + } + if s.in > 0 && s.out > 0 { + in.Observe(float64(s.in)) + out.Observe(float64(s.out)) + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 38b2fcd9a..20b2213ce 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -96,6 +96,7 @@ var ( TiKVReadThroughput prometheus.Histogram TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec + TiKVStaleReadSizeSummary *prometheus.SummaryVec ) // Label constants. @@ -116,6 +117,7 @@ const ( LblToStore = "to_store" LblStaleRead = "stale_read" LblSource = "source" + LblDirection = "direction" ) func initMetrics(namespace, subsystem string) { @@ -589,6 +591,14 @@ func initMetrics(namespace, subsystem string) { Help: "Counter of assertions used in prewrite requests", }, []string{LblType}) + TiKVStaleReadSizeSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stale_read_bytes", + Help: "Size of stale read.", + }, []string{LblResult, LblDirection}) + initShortcuts() } @@ -659,6 +669,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVReadThroughput) prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec) prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter) + prometheus.MustRegister(TiKVStaleReadSizeSummary) } // readCounter reads the value of a prometheus.Counter. diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index 8ff838560..30b5207f8 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -135,6 +135,11 @@ var ( PrewriteAssertionUsageCounterExist prometheus.Counter PrewriteAssertionUsageCounterNotExist prometheus.Counter PrewriteAssertionUsageCounterUnknown prometheus.Counter + + StaleReadHitInTraffic prometheus.Observer + StaleReadHitOutTraffic prometheus.Observer + StaleReadMissInTraffic prometheus.Observer + StaleReadMissOutTraffic prometheus.Observer ) func initShortcuts() { @@ -235,4 +240,9 @@ func initShortcuts() { PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist") PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist") PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown") + + StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in") + StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out") + StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in") + StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out") } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index d864c415a..bc78d8b65 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -270,6 +270,12 @@ func (req *Request) EnableStaleRead() { req.ReplicaRead = false } +// DisableStaleReadMeetLock is called when stale-read fallbacks to leader read after meeting key-is-locked error. +func (req *Request) DisableStaleReadMeetLock() { + req.StaleRead = false + req.ReplicaReadType = kv.ReplicaReadLeader +} + // IsGlobalStaleRead checks if the request is a global stale read request. func (req *Request) IsGlobalStaleRead() bool { return req.ReadReplicaScope == oracle.GlobalTxnScope && diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 8ba0888d4..76b5aed5a 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -365,6 +365,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, s.mergeRegionRequestStats(cli.Stats) }() } + isStaleness := s.mu.isStaleness s.mu.RUnlock() pending := batch.keys @@ -386,7 +387,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, s.mu.resourceGroupTagger(req) } scope := s.mu.readReplicaScope - isStaleness := s.mu.isStaleness matchStoreLabels := s.mu.matchStoreLabels replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() @@ -479,6 +479,10 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } else { cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) } + // we need to read from leader after resolving the lock. + if isStaleness { + isStaleness = false + } resolveLocksOpts := txnlock.ResolveLocksOptions{ CallerStartTS: s.version, Locks: locks, @@ -656,6 +660,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] return nil, err } if firstLock == nil { + // we need to read from leader after resolving the lock. + if isStaleness { + req.DisableStaleReadMeetLock() + } firstLock = lock } else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID { // If it is an autocommit point get, it needs to be blocked only