From f37133e8ed41f61513b11388987cda4e33e5c7b7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 10 Dec 2021 15:23:57 +0800 Subject: [PATCH] etcd_worker: batch etcd patch (#3277) (#3390) --- cdc/capture.go | 2 +- cdc/capture/capture.go | 3 +- cdc/metrics.go | 2 + cdc/model/reactor_state.go | 1 + errors.toml | 10 + metrics/grafana/ticdc.json | 1116 ++++++++++++++++- pkg/errors/errors.go | 8 +- pkg/orchestrator/batch.go | 91 ++ pkg/orchestrator/batch_test.go | 75 ++ pkg/orchestrator/etcd_worker.go | 79 +- pkg/orchestrator/etcd_worker_bank_test.go | 2 +- pkg/orchestrator/etcd_worker_test.go | 16 +- pkg/orchestrator/interfaces.go | 6 +- pkg/orchestrator/metrics.go | 52 + .../cdc_state_checker/cdc_monitor.go | 2 +- 15 files changed, 1418 insertions(+), 47 deletions(-) create mode 100644 pkg/orchestrator/batch.go create mode 100644 pkg/orchestrator/batch_test.go create mode 100644 pkg/orchestrator/metrics.go diff --git a/cdc/capture.go b/cdc/capture.go index 0015c387bb1..eaf419b3b55 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -162,7 +162,7 @@ func (c *Capture) Run(ctx context.Context) (err error) { return errors.Trace(err) } log.Info("start to listen processor task...") - if err := etcdWorker.Run(ctx, c.session, 200*time.Millisecond); err != nil { + if err := etcdWorker.Run(ctx, c.session, 200*time.Millisecond, c.info.AdvertiseAddr); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index dd5c8b27f74..fb4dc8618d5 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -280,7 +280,8 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea if err != nil { return errors.Trace(err) } - if err := etcdWorker.Run(ctx, c.session, timerInterval); err != nil { + captureAddr := c.info.AdvertiseAddr + if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/metrics.go b/cdc/metrics.go index 2fde08e12d2..ec83b5eab35 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/ticdc/cdc/puller/sorter" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/orchestrator" "github.com/prometheus/client_golang/prometheus" ) @@ -36,6 +37,7 @@ func init() { puller.InitMetrics(registry) sink.InitMetrics(registry) entry.InitMetrics(registry) + orchestrator.InitMetrics(registry) sorter.InitMetrics(registry) if config.NewReplicaImpl { processor.InitMetrics(registry) diff --git a/cdc/model/reactor_state.go b/cdc/model/reactor_state.go index b4e4b8668a3..195e552d98d 100644 --- a/cdc/model/reactor_state.go +++ b/cdc/model/reactor_state.go @@ -100,6 +100,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro } // GetPatches implements the ReactorState interface +// Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState func (s *GlobalReactorState) GetPatches() [][]orchestrator.DataPatch { pendingPatches := s.pendingPatches for _, changefeedState := range s.Changefeeds { diff --git a/errors.toml b/errors.toml index ee46264edba..bfdcc062a57 100755 --- a/errors.toml +++ b/errors.toml @@ -196,6 +196,16 @@ error = ''' the etcd txn should be aborted and retried immediately ''' +["CDC:ErrEtcdTxnOpsExceed"] +error = ''' +patch ops:%d of a single changefeed exceed etcd txn max ops:%d +''' + +["CDC:ErrEtcdTxnSizeExceed"] +error = ''' +patch size:%d of a single changefeed exceed etcd txn max size:%d +''' + ["CDC:ErrEventFeedAborted"] error = ''' single event feed aborted diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 5ca6a1fd173..f0e0d123f4f 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -929,6 +929,1119 @@ "title": "Server", "type": "row" }, + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{lcapture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, { "collapsed": true, "gridPos": { @@ -9682,4 +10795,5 @@ "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", "version": 25 -} \ No newline at end of file +} + diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 906b122b342..bce326cdd66 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -193,9 +193,11 @@ var ( // ErrEtcdSessionDone is used by etcd worker to signal a session done ErrEtcdSessionDone = errors.Normalize("the etcd session is done", errors.RFCCodeText("CDC:ErrEtcdSessionDone")) // ErrReactorFinished is used by reactor to signal a **normal** exit. - ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) - ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) - ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) + ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) + ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed")) // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go new file mode 100644 index 00000000000..fec17080a3c --- /dev/null +++ b/pkg/orchestrator/batch.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "github.com/pingcap/errors" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/orchestrator/util" +) + +const ( + // 1.25 MiB + // Ref: https://etcd.io/docs/v3.3/dev-guide/limit/ + etcdTxnMaxSize = 1024 * (1024 + 256) + // Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops + etcdTxnMaxOps = 128 +) + +// getBatchChangedState has 4 return values: +// 1.batchChangedSate +// 2.number of patch apply to batchChangedState +// 3.size of batchChangedState in byte +// 4.error +func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPatch) (map[util.EtcdKey][]byte, int, int, error) { + num := 0 + totalSize := 0 + // store changedState of multiple changefeed + batchChangedState := make(map[util.EtcdKey][]byte) + for i, patches := range patchGroups { + changedState, changedSize, err := getChangedState(state, patches) + if err != nil { + return nil, 0, 0, err + } + // if a changefeed's changedState size is larger than etcdTxnMaxSize + // or the length of changedState is larger than etcdTxnMaxOps + // we should return an error instantly + if i == 0 { + if changedSize > etcdTxnMaxSize { + return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize) + } + if len(changedState) > etcdTxnMaxOps { + return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps) + } + } + + // batchChangedState size should not exceeds the etcdTxnMaxSize limit + // and keys numbers should not exceeds the etcdTxnMaxOps limit + if totalSize+changedSize >= etcdTxnMaxSize || + len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { + break + } + for k, v := range changedState { + batchChangedState[k] = v + } + num++ + totalSize += changedSize + } + return batchChangedState, num, totalSize, nil +} + +func getChangedState(state map[util.EtcdKey][]byte, patches []DataPatch) (map[util.EtcdKey][]byte, int, error) { + changedSet := make(map[util.EtcdKey]struct{}) + changeState := make(map[util.EtcdKey][]byte) + changedSize := 0 + for _, patch := range patches { + err := patch.Patch(state, changedSet) + if err != nil { + if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { + continue + } + return nil, 0, errors.Trace(err) + } + } + for k := range changedSet { + v := state[k] + changedSize += len(k.String())*2 + len(v) + changeState[k] = v + } + return changeState, changedSize, nil +} diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go new file mode 100644 index 00000000000..9a7267a6f9d --- /dev/null +++ b/pkg/orchestrator/batch_test.go @@ -0,0 +1,75 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "fmt" + "testing" + + "github.com/pingcap/ticdc/pkg/orchestrator/util" + "github.com/stretchr/testify/require" +) + +func TestGetBatchChangeState(t *testing.T) { + t.Parallel() + patchGroupSize := 1000 + patchGroup := make([][]DataPatch, patchGroupSize) + for i := 0; i < patchGroupSize; i++ { + i := i + patches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }} + patchGroup[i] = patches + } + rawState := make(map[util.EtcdKey][]byte) + changedState, n, size, err := getBatchChangedState(rawState, patchGroup) + require.Nil(t, err) + require.LessOrEqual(t, n, len(patchGroup)) + require.LessOrEqual(t, size, etcdTxnMaxSize) + require.LessOrEqual(t, len(changedState), etcdTxnMaxOps) + require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")]) + + // test single patch exceed txn max size + largeSizePatches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey("largePatch"), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = make([]byte, etcdTxnMaxSize) + return newValue, true, nil + }, + }} + patchGroup = [][]DataPatch{largeSizePatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size") + + // test single patch exceed txn max ops + manyOpsPatches := make([]DataPatch, 0) + for i := 0; i <= etcdTxnMaxOps*2; i++ { + manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }) + } + patchGroup = [][]DataPatch{manyOpsPatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops") +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 102fe84db11..acccfbdb21e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -24,6 +24,7 @@ import ( cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/orchestrator/util" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc/mvccpb" @@ -32,6 +33,11 @@ import ( "golang.org/x/time/rate" ) +const ( + etcdRequestProgressDuration = 2 * time.Second + deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" +) + // EtcdWorker handles all interactions with Etcd type EtcdWorker struct { client *etcd.Client @@ -58,6 +64,14 @@ type EtcdWorker struct { // a `compare-and-swap` semantics, which is essential for implementing // snapshot isolation for Reactor ticks. deleteCounter int64 + metrics *etcdWorkerMetrics +} + +type etcdWorkerMetrics struct { + // kv events related metrics + metricEtcdTxnSize prometheus.Observer + metricEtcdTxnDuration prometheus.Observer + metricEtcdWorkerTickDuration prometheus.Observer } type etcdUpdate struct { @@ -85,18 +99,23 @@ func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initStat }, nil } -const ( - etcdRequestProgressDuration = 2 * time.Second - deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" -) +func (worker *EtcdWorker) initMetrics(captureAddr string) { + metrics := &etcdWorkerMetrics{} + metrics.metricEtcdTxnSize = etcdTxnSize.WithLabelValues(captureAddr) + metrics.metricEtcdTxnDuration = etcdTxnExecDuration.WithLabelValues(captureAddr) + metrics.metricEtcdWorkerTickDuration = etcdWorkerTickDuration.WithLabelValues(captureAddr) + worker.metrics = metrics +} // Run starts the EtcdWorker event loop. // A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. // If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. // And the specified etcd session is nil-safety. -func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration) error { +func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error { defer worker.cleanUp() + worker.initMetrics(captureAddr) + err := worker.syncRawState(ctx) if err != nil { return errors.Trace(err) @@ -127,7 +146,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, tickRate := time.Second / timerInterval rl := rate.NewLimiter(rate.Limit(tickRate), 1) for { - var response clientv3.WatchResponse select { case <-ctx.Done(): return ctx.Err() @@ -140,19 +158,17 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, log.Warn("failed to request progress for etcd watcher", zap.Error(err)) } } - case response = <-watchCh: + case response := <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. if err := response.Err(); err != nil { return errors.Trace(err) } lastReceivedEventTime = time.Now() - // Check whether the response is stale. if worker.revision >= response.Header.GetRevision() { continue } worker.revision = response.Header.GetRevision() - // ProgressNotify implies no new events. if response.IsProgressNotify() { continue @@ -162,6 +178,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // handleEvent will apply the event to our internal `rawState`. worker.handleEvent(ctx, event) } + } if len(pendingPatches) > 0 { @@ -196,8 +213,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if !rl.Allow() { continue } + startTime := time.Now() // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) + costTime := time.Since(startTime).Seconds() + if costTime > time.Second.Seconds()*1 { + log.Warn("etcdWorker ticks reactor cost time more than 1 second") + } + worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { return errors.Trace(err) @@ -284,33 +307,27 @@ func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte { } func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]DataPatch) ([][]DataPatch, error) { + state := worker.cloneRawState() for len(patchGroups) > 0 { - patches := patchGroups[0] - err := worker.applyPatches(ctx, patches) + changeSate, n, size, err := getBatchChangedState(state, patchGroups) + if err != nil { + return patchGroups, err + } + err = worker.commitChangedState(ctx, changeSate, size) if err != nil { return patchGroups, err } - patchGroups = patchGroups[1:] + patchGroups = patchGroups[n:] } return patchGroups, nil } -func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) error { - state := worker.cloneRawState() - changedSet := make(map[util.EtcdKey]struct{}) - for _, patch := range patches { - err := patch.Patch(state, changedSet) - if err != nil { - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - return errors.Trace(err) - } - } - cmps := make([]clientv3.Cmp, 0, len(changedSet)) - ops := make([]clientv3.Op, 0, len(changedSet)) +func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error { + cmps := make([]clientv3.Cmp, 0, len(changedState)) + ops := make([]clientv3.Op, 0, len(changedState)) hasDelete := false - for key := range changedSet { + + for key, value := range changedState { // make sure someone else has not updated the key after the last snapshot var cmp clientv3.Cmp if entry, ok := worker.rawState[key]; ok { @@ -322,7 +339,6 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) } cmps = append(cmps, cmp) - value := state[key] var op clientv3.Op if value != nil { op = clientv3.OpPut(key.String(), string(value)) @@ -344,7 +360,14 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) panic("unreachable") } + worker.metrics.metricEtcdTxnSize.Observe(float64(size)) + startTime := time.Now() resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() + costTime := time.Since(startTime).Seconds() + if costTime > time.Second.Seconds()*1 { + log.Warn("etcdWorker commit etcd txn cost time more than 1 second") + } + worker.metrics.metricEtcdTxnDuration.Observe(costTime) if err != nil { return errors.Trace(err) } diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 6a7f3daec32..8785115ec31 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -150,7 +150,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { accountNumber: totalAccountNumber, }, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)}) c.Assert(err, check.IsNil) - err = worker.Run(ctx, nil, 100*time.Millisecond) + err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1") if err == nil || err.Error() == "etcdserver: request timed out" { continue } diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index ad0711f386d..65bfc26a8d8 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -267,7 +267,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { return errors.Trace(err) } - return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond)) + return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")) }) } @@ -352,7 +352,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { c.Assert(err, check.IsNil) errg := &errgroup.Group{} errg.Go(func() error { - return reactor.Run(ctx, nil, 10*time.Millisecond) + return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") }) time.Sleep(500 * time.Millisecond) @@ -437,7 +437,7 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -506,7 +506,7 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -585,7 +585,7 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -652,7 +652,7 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -733,7 +733,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := worker1.Run(ctx, nil, time.Millisecond*100) + err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) }() @@ -748,7 +748,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { }) c.Assert(err, check.IsNil) - err = worker2.Run(ctx, nil, time.Millisecond*100) + err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) modifyReactor.waitOnCh <- struct{}{} diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 6bf0eaf7b6e..cf29860a471 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -68,10 +68,10 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map return nil } -// MultiDatePatch represents an update to many keys -type MultiDatePatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error +// MultiDataPatch represents an update to many keys +type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error // Patch implements the DataPatch interface -func (m MultiDatePatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { +func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { return m(valueMap, changedSet) } diff --git a/pkg/orchestrator/metrics.go b/pkg/orchestrator/metrics.go new file mode 100644 index 00000000000..efbb242871a --- /dev/null +++ b/pkg/orchestrator/metrics.go @@ -0,0 +1,52 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import "github.com/prometheus/client_golang/prometheus" + +var ( + etcdTxnSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_size_bytes", + Help: "Bucketed histogram of a etcd txn size.", + Buckets: prometheus.ExponentialBuckets(1, 2, 18), + }, []string{"capture"}) + + etcdTxnExecDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_exec_duration", + Help: "Bucketed histogram of processing time (s) of a etcd txn.", + Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18), + }, []string{"capture"}) + + etcdWorkerTickDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "tick_reactor_duration", + Help: "Bucketed histogram of etcdWorker tick reactor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"capture"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(etcdTxnSize) + registry.MustRegister(etcdTxnExecDuration) + registry.MustRegister(etcdWorkerTickDuration) +} diff --git a/testing_utils/cdc_state_checker/cdc_monitor.go b/testing_utils/cdc_state_checker/cdc_monitor.go index ee536bd0328..30179b29de0 100644 --- a/testing_utils/cdc_state_checker/cdc_monitor.go +++ b/testing_utils/cdc_state_checker/cdc_monitor.go @@ -92,7 +92,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti func (m *cdcMonitor) run(ctx context.Context) error { log.Debug("start running cdcMonitor") - err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond) + err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1") log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) log.Info("CDC state", zap.Reflect("state", m.reactor.state)) return err