Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport the stale read optimizations to 6.5 #818

Merged
merged 2 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,23 @@ func (s *RegionRequestSender) SendReqCtx(
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(tryTimes))
}
}()

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
}

for {
if tryTimes > 0 {
req.IsRetryRequest = true
if tryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes))
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1071,6 +1082,9 @@ func (s *RegionRequestSender) SendReqCtx(
s.replicaSelector.onSendSuccess()
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
}
return resp, rpcCtx, nil
}
}
Expand Down Expand Up @@ -1642,3 +1656,59 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
// Because caller may need to re-split the request.
return false, nil
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
size := 0
switch s.tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size()
case tikvrpc.CmdScan:
size += resp.Resp.(*kvrpcpb.ScanResponse).Size()
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
}
}
11 changes: 11 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
)

// Label constants.
Expand All @@ -116,6 +117,7 @@ const (
LblToStore = "to_store"
LblStaleRead = "stale_read"
LblSource = "source"
LblDirection = "direction"
)

func initMetrics(namespace, subsystem string) {
Expand Down Expand Up @@ -589,6 +591,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
}, []string{LblResult, LblDirection})

initShortcuts()
}

Expand Down Expand Up @@ -659,6 +669,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
10 changes: 10 additions & 0 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ var (
PrewriteAssertionUsageCounterExist prometheus.Counter
PrewriteAssertionUsageCounterNotExist prometheus.Counter
PrewriteAssertionUsageCounterUnknown prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
)

func initShortcuts() {
Expand Down Expand Up @@ -235,4 +240,9 @@ func initShortcuts() {
PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist")
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
}
6 changes: 6 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func (req *Request) EnableStaleRead() {
req.ReplicaRead = false
}

// DisableStaleReadMeetLock is called when stale-read fallbacks to leader read after meeting key-is-locked error.
func (req *Request) DisableStaleReadMeetLock() {
req.StaleRead = false
req.ReplicaReadType = kv.ReplicaReadLeader
}

// IsGlobalStaleRead checks if the request is a global stale read request.
func (req *Request) IsGlobalStaleRead() bool {
return req.ReadReplicaScope == oracle.GlobalTxnScope &&
Expand Down
10 changes: 9 additions & 1 deletion txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mergeRegionRequestStats(cli.Stats)
}()
}
isStaleness := s.mu.isStaleness
s.mu.RUnlock()

pending := batch.keys
Expand All @@ -386,7 +387,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mu.resourceGroupTagger(req)
}
scope := s.mu.readReplicaScope
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
replicaAdjuster := s.mu.replicaReadAdjuster
s.mu.RUnlock()
Expand Down Expand Up @@ -479,6 +479,10 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
// we need to read from leader after resolving the lock.
if isStaleness {
isStaleness = false
}
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: s.version,
Locks: locks,
Expand Down Expand Up @@ -656,6 +660,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
return nil, err
}
if firstLock == nil {
// we need to read from leader after resolving the lock.
if isStaleness {
req.DisableStaleReadMeetLock()
}
firstLock = lock
} else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID {
// If it is an autocommit point get, it needs to be blocked only
Expand Down