diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 155c388cc66..ce0dc428aaa 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -79,6 +79,13 @@ var ( Name: "region_token", Help: "size of region token in kv client", }, []string{"store", "changefeed", "capture"}) + cachedRegionSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "cached_region", + Help: "cached region that has not requested to TiKV in kv client", + }, []string{"store", "changefeed", "capture"}) batchResolvedEventSize = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -113,6 +120,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(sendEventCounter) registry.MustRegister(clientChannelSize) registry.MustRegister(clientRegionTokenSize) + registry.MustRegister(cachedRegionSize) registry.MustRegister(batchResolvedEventSize) registry.MustRegister(etcdRequestCounter) registry.MustRegister(grpcPoolStreamGauge) diff --git a/cdc/kv/token_region.go b/cdc/kv/token_region.go index 257776c3af9..a9f36001a98 100644 --- a/cdc/kv/token_region.go +++ b/cdc/kv/token_region.go @@ -45,22 +45,28 @@ type LimitRegionRouter interface { Run(ctx context.Context) error } +// srrMetrics keeps metrics of a Sized Region Router type srrMetrics struct { capture string changefeed string - tokens map[string]prometheus.Gauge + // mapping from id(TiKV store address) to token used + tokens map[string]prometheus.Gauge + // mapping from id(TiKV store address) to cached regions + cachedRegions map[string]prometheus.Gauge } func newSrrMetrics(ctx context.Context) *srrMetrics { captureAddr := util.CaptureAddrFromCtx(ctx) changefeed := util.ChangefeedIDFromCtx(ctx) return &srrMetrics{ - capture: captureAddr, - changefeed: changefeed, - tokens: make(map[string]prometheus.Gauge), + capture: captureAddr, + changefeed: changefeed, + tokens: make(map[string]prometheus.Gauge), + cachedRegions: make(map[string]prometheus.Gauge), } } +// each changefeed on a capture maintains a sizedRegionRouter type sizedRegionRouter struct { buffer map[string][]singleRegionInfo output chan singleRegionInfo @@ -96,10 +102,16 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) { r.output <- sri } else { r.buffer[id] = append(r.buffer[id], sri) + if _, ok := r.metrics.cachedRegions[id]; !ok { + r.metrics.cachedRegions[id] = cachedRegionSize.WithLabelValues(id, r.metrics.changefeed, r.metrics.capture) + } + r.metrics.cachedRegions[id].Inc() } r.lock.Unlock() } +// Acquire implements LimitRegionRouter.Acquire +// param: id is TiKV store address func (r *sizedRegionRouter) Acquire(id string) { r.lock.Lock() defer r.lock.Unlock() @@ -110,6 +122,8 @@ func (r *sizedRegionRouter) Acquire(id string) { r.metrics.tokens[id].Inc() } +// Release implements LimitRegionRouter.Release +// param: id is TiKV store address func (r *sizedRegionRouter) Release(id string) { r.lock.Lock() defer r.lock.Unlock() @@ -131,7 +145,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error { r.lock.Lock() for id, buf := range r.buffer { available := r.sizeLimit - r.tokens[id] - // the tokens used could be more then size limit, since we have + // the tokens used could be more than size limit, since we have // a sized channel as level1 cache if available <= 0 { continue @@ -156,6 +170,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error { } } r.buffer[id] = r.buffer[id][available:] + r.metrics.cachedRegions[id].Sub(float64(available)) } r.lock.Unlock() } diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 47c62f561c2..59f700f02ac 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -5988,6 +5988,214 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of regions that have not connected to TiKV", + "fieldConfig": { + "defaults": { + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 81 + }, + "hiddenSeries": false, + "id": 251, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (capture, changefeed, store)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{changefeed}}-{{capture}}-{{store}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "KV client cached regions", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Estimate the remaining time for a changefeed initialization (on a specific capture)", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 81 + }, + "hiddenSeries": false, + "id": 252, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "abs(sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"} / deriv(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (capture, changefeed, store))", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{changefeed}}-{{capture}}-{{store}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Estimate remaining time for initialization", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": "", + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Events", @@ -9216,5 +9424,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 22 -} \ No newline at end of file + "version": 23 +}