Skip to content

Commit

Permalink
owner(ticdc): add owner resolved ts and resolved lag metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Dec 25, 2021
1 parent 3aef729 commit d08a274
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 10 deletions.
20 changes: 17 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type changefeed struct {

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
Expand Down Expand Up @@ -292,6 +294,8 @@ LOOP:
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -323,10 +327,17 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
}
c.wg.Wait()
c.scheduler.Close(ctx)

changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

c.initialized = false
}

Expand Down Expand Up @@ -524,10 +535,13 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}
return status, changed, nil
})
phyTs := oracle.ExtractPhysical(checkpointTs)
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close(ctx cdcContext.Context) {
Expand Down
18 changes: 17 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ var (
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -63,7 +77,9 @@ const (
// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
Expand Down
29 changes: 23 additions & 6 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -2224,21 +2224,30 @@
"steppedLine": false,
"targets": [
{
"expr": "max(ticdc_owner_checkpoint_ts{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed) > 0",
"expr": "max(pd_cluster_tso{tidb_cluster=\"$tidb_cluster\"})",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}",
"legendFormat": "approximate current time (s)",
"refId": "A"
},
{
"expr": "max(pd_cluster_tso{tidb_cluster=\"$tidb_cluster\"})",
"expr": "max(ticdc_owner_checkpoint_ts{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed) > 0",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "approximate current time (s)",
"legendFormat": "{{changefeed}}",
"refId": "B"
},
{
"expr": "max(ticdc_owner_resolved_ts{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed) > 0",
"format": "time_series",
"hide": true,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-resolvedts",
"refId": "C"
}
],
"thresholds": [],
Expand Down Expand Up @@ -2529,13 +2538,21 @@
"legendFormat": "{{changefeed}}-{{capture}}",
"refId": "B"
},
{
"expr": "max(ticdc_owner_resolved_ts_lag{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-resolvedts",
"refId": "C"
},
{
"expr": "sum(ticdc_processor_resolved_ts_lag{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (capture,changefeed)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-resolvedts",
"refId": "C"
"refId": "D"
}
],
"thresholds": [],
Expand Down

0 comments on commit d08a274

Please sign in to comment.