Skip to content

Commit

Permalink
kv/client: add cached regions metric in region router (#2994) (#3032)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 14, 2021
1 parent 5527e7e commit 25ed6dc
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 7 deletions.
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ var (
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "changefeed", "capture"})
cachedRegionSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "cached_region",
Help: "cached region that has not requested to TiKV in kv client",
}, []string{"store", "changefeed", "capture"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -113,6 +120,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(etcdRequestCounter)
registry.MustRegister(grpcPoolStreamGauge)
Expand Down
25 changes: 20 additions & 5 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,28 @@ type LimitRegionRouter interface {
Run(ctx context.Context) error
}

// srrMetrics keeps metrics of a Sized Region Router
type srrMetrics struct {
capture string
changefeed string
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to token used
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to cached regions
cachedRegions map[string]prometheus.Gauge
}

func newSrrMetrics(ctx context.Context) *srrMetrics {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeed := util.ChangefeedIDFromCtx(ctx)
return &srrMetrics{
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
cachedRegions: make(map[string]prometheus.Gauge),
}
}

// each changefeed on a capture maintains a sizedRegionRouter
type sizedRegionRouter struct {
buffer map[string][]singleRegionInfo
output chan singleRegionInfo
Expand Down Expand Up @@ -96,10 +102,16 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.output <- sri
} else {
r.buffer[id] = append(r.buffer[id], sri)
if _, ok := r.metrics.cachedRegions[id]; !ok {
r.metrics.cachedRegions[id] = cachedRegionSize.WithLabelValues(id, r.metrics.changefeed, r.metrics.capture)
}
r.metrics.cachedRegions[id].Inc()
}
r.lock.Unlock()
}

// Acquire implements LimitRegionRouter.Acquire
// param: id is TiKV store address
func (r *sizedRegionRouter) Acquire(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -110,6 +122,8 @@ func (r *sizedRegionRouter) Acquire(id string) {
r.metrics.tokens[id].Inc()
}

// Release implements LimitRegionRouter.Release
// param: id is TiKV store address
func (r *sizedRegionRouter) Release(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -131,7 +145,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
r.lock.Lock()
for id, buf := range r.buffer {
available := r.sizeLimit - r.tokens[id]
// the tokens used could be more then size limit, since we have
// the tokens used could be more than size limit, since we have
// a sized channel as level1 cache
if available <= 0 {
continue
Expand All @@ -156,6 +170,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
}
}
r.buffer[id] = r.buffer[id][available:]
r.metrics.cachedRegions[id].Sub(float64(available))
}
r.lock.Unlock()
}
Expand Down
212 changes: 210 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -5988,6 +5988,214 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The number of regions that have not connected to TiKV",
"fieldConfig": {
"defaults": {
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
"y": 81
},
"hiddenSeries": false,
"id": 251,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (capture, changefeed, store)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{store}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "KV client cached regions",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Estimate the remaining time for a changefeed initialization (on a specific capture)",
"fieldConfig": {
"defaults": {
"unit": "s"
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 81
},
"hiddenSeries": false,
"id": 252,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "abs(sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"} / deriv(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (capture, changefeed, store))",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{store}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Estimate remaining time for initialization",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": "",
"logBase": 2,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Events",
Expand Down Expand Up @@ -9216,5 +9424,5 @@
"timezone": "browser",
"title": "Test-Cluster-TiCDC",
"uid": "YiGL8hBZ1",
"version": 22
}
"version": 23
}

0 comments on commit 25ed6dc

Please sign in to comment.