Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix stale read ops metric (#878) #886

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 42 additions & 36 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type replicaSelector struct {
region *Region
regionStore *regionStore
replicas []*replica
labels []*metapb.StoreLabel
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
Expand Down Expand Up @@ -747,6 +748,10 @@ func newReplicaSelector(
)
}

option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
var state selectorState
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
Expand All @@ -755,10 +760,6 @@ func newReplicaSelector(
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
}
} else {
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
WithPerferLeader()(&option)
}
Expand All @@ -778,6 +779,7 @@ func newReplicaSelector(
cachedRegion,
regionStore,
replicas,
option.labels,
state,
-1,
-1,
Expand Down Expand Up @@ -1156,9 +1158,14 @@ func (s *RegionRequestSender) SendReqCtx(

var staleReadCollector *staleReadMetricsCollector
if req.StaleRead {
staleReadCollector = &staleReadMetricsCollector{hit: true}
staleReadCollector.onReq(req)
defer staleReadCollector.collect()
staleReadCollector = &staleReadMetricsCollector{}
defer func() {
if retryTimes == 0 {
metrics.StaleReadHitCounter.Add(1)
} else {
metrics.StaleReadMissCounter.Add(1)
}
}()
}

for {
Expand All @@ -1171,9 +1178,6 @@ func (s *RegionRequestSender) SendReqCtx(
zap.Int("times", retryTimes),
)
}
if req.StaleRead && staleReadCollector != nil {
staleReadCollector.hit = false
}
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
Expand Down Expand Up @@ -1204,6 +1208,14 @@ func (s *RegionRequestSender) SendReqCtx(
return resp, nil, retryTimes, err
}

var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if target := s.replicaSelector.targetReplica(); target != nil {
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels)
staleReadCollector.onReq(req, isLocalTraffic)
}
}

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

Expand Down Expand Up @@ -1262,7 +1274,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
}
if staleReadCollector != nil {
staleReadCollector.onResp(resp)
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
}
return resp, rpcCtx, retryTimes, nil
}
Expand Down Expand Up @@ -1955,35 +1967,36 @@ func (s *RegionRequestSender) onRegionError(
}

type staleReadMetricsCollector struct {
tp tikvrpc.CmdType
hit bool
out int
in int
}

func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += req.Get().Size()
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size += req.BatchGet().Size()
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size += req.Scan().Size()
size = req.Scan().Size()
case tikvrpc.CmdCop:
size += req.Cop().Size()
size = req.Cop().Size()
default:
// ignore non-read requests
return
}
s.tp = req.Type
size += req.Context.Size()
s.out = size
if isLocalTraffic {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
} else {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
}
}

func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
size := 0
switch s.tp {
switch tp {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
Expand All @@ -1993,19 +2006,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
default:
// unreachable
// ignore non-read requests
return
}
s.in = size
}

func (s *staleReadMetricsCollector) collect() {
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
if !s.hit {
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
}
if s.in > 0 && s.out > 0 {
in.Observe(float64(s.in))
out.Observe(float64(s.out))
if isLocalTraffic {
metrics.StaleReadLocalInBytes.Add(float64(size))
} else {
metrics.StaleReadRemoteInBytes.Add(float64(size))
}
}
30 changes: 25 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ var (
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
TiKVStaleReadSizeSummary *prometheus.SummaryVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
)

// Label constants.
Expand Down Expand Up @@ -640,12 +642,28 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of flows under PreferLeader mode.",
}, []string{LblType, LblStore})

TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
TiKVStaleReadCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_counter",
Help: "Counter of stale read hit/miss",
}, []string{LblResult})

TiKVStaleReadReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_req_counter",
Help: "Counter of stale read requests",
}, []string{LblType})

TiKVStaleReadBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stale_read_bytes",
Help: "Size of stale read.",
Help: "Counter of stale read requests bytes",
}, []string{LblResult, LblDirection})

initShortcuts()
Expand Down Expand Up @@ -723,7 +741,9 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
prometheus.MustRegister(TiKVStaleReadSizeSummary)
prometheus.MustRegister(TiKVStaleReadCounter)
prometheus.MustRegister(TiKVStaleReadReqCounter)
prometheus.MustRegister(TiKVStaleReadBytes)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
28 changes: 20 additions & 8 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,16 @@ var (
AggressiveLockedKeysLockedWithConflict prometheus.Counter
AggressiveLockedKeysNonForceLock prometheus.Counter

StaleReadHitInTraffic prometheus.Observer
StaleReadHitOutTraffic prometheus.Observer
StaleReadMissInTraffic prometheus.Observer
StaleReadMissOutTraffic prometheus.Observer
StaleReadHitCounter prometheus.Counter
StaleReadMissCounter prometheus.Counter

StaleReadReqLocalCounter prometheus.Counter
StaleReadReqCrossZoneCounter prometheus.Counter

StaleReadLocalInBytes prometheus.Counter
StaleReadLocalOutBytes prometheus.Counter
StaleReadRemoteInBytes prometheus.Counter
StaleReadRemoteOutBytes prometheus.Counter
)

func initShortcuts() {
Expand Down Expand Up @@ -296,8 +302,14 @@ func initShortcuts() {
// TiKV).
AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock")

StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit")
StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss")

StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local")
StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone")

StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in")
StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out")
StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
}