Skip to content

Commit

Permalink
log_backup: added more info for slow regions in log backup advancer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 20, 2024
1 parent 43d42b8 commit 15064f4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
36 changes: 33 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamhelper
import (
"bytes"
"context"
"fmt"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -272,20 +273,49 @@ func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
region, err := locateKeyOfRegion(ctx, c.env, startKey)
if err != nil {
return errors.Annotate(err, "failed to fetch region").Error()
}
r := region.Region
l := region.Leader
prs := []int{}
for _, p := range r.GetPeers() {
prs = append(prs, int(p.StoreId))
}
metrics.LogBackupCurrentLastRegionID.Set(float64(r.Id))
metrics.LogBackupCurrentLastRegionLeaderStoreID.Set(float64(l.StoreId))
return fmt.Sprintf("ID=%d,Leader=%d,ConfVer=%d,Version=%d,Peers=%v,RealRange=%s",
r.GetId(), l.GetStoreId(), r.GetRegionEpoch().GetConfVer(), r.GetRegionEpoch().GetVersion(),
prs, logutil.StringifyRange{StartKey: r.GetStartKey(), EndKey: r.GetEndKey()})
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
var minValue spans.Valued
thresholdTso := tsoBefore(threshold)
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
vsf.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
vsf.TraverseValuesLessThan(thresholdTso, func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
minValue = vsf.Min()
})
log.Info("current last region", zap.String("category", "log backup advancer hint"),
sctx, cancel := context.WithTimeout(ctx, time.Second)
// Always fetch the hint and update the metrics.
hint := c.fetchRegionHint(sctx, minValue.Key.StartKey)
logger := log.Debug
if minValue.Value < thresholdTso {
logger = log.Info
}
logger("current last region", zap.String("category", "log backup advancer hint"),
zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)),
zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)))
zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)),
zap.String("region-hint", hint),
)
cancel()
if len(targets) == 0 {
return minValue, nil
}
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ func IterateRegion(cli TiKVClusterMeta, startKey, endKey []byte) *RegionIter {
}
}

// locateKeyOfRegion locates the place of the region in the key.
func locateKeyOfRegion(ctx context.Context, cli TiKVClusterMeta, key []byte) (RegionWithLeader, error) {
regions, err := cli.RegionScan(ctx, key, kv.Key(key).Next(), 1)
if err != nil {
return RegionWithLeader{}, err
}
if len(regions) == 0 {
return RegionWithLeader{}, errors.Annotatef(berrors.ErrPDBatchScanRegion,
"scanning the key %s returns empty region", redact.Key(key))
}
return regions[0], nil
}

func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error {
// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
Expand Down
16 changes: 16 additions & 0 deletions pkg/metrics/log_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ var (
RegionCheckpointRequest *prometheus.CounterVec
RegionCheckpointFailure *prometheus.CounterVec
RegionCheckpointSubscriptionEvent *prometheus.HistogramVec

LogBackupCurrentLastRegionID prometheus.Gauge
LogBackupCurrentLastRegionLeaderStoreID prometheus.Gauge
)

// InitLogBackupMetrics initializes log backup metrics.
Expand Down Expand Up @@ -84,4 +87,17 @@ func InitLogBackupMetrics() {
Help: "The region flush event size.",
Buckets: prometheus.ExponentialBuckets(8, 2.0, 12),
}, []string{"store"})

LogBackupCurrentLastRegionID = NewGauge(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "current_last_region_id",
Help: "The id of the region have minimal checkpoint ts in the current running task.",
})
LogBackupCurrentLastRegionLeaderStoreID = NewGauge(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "current_last_region_leader_store_id",
Help: "The leader's store id of the region have minimal checkpoint ts in the current running task.",
})
}

0 comments on commit 15064f4

Please sign in to comment.