diff --git a/Makefile b/Makefile index c6d56f1971e..633174bfcb1 100644 --- a/Makefile +++ b/Makefile @@ -78,14 +78,14 @@ build: cdc build-failpoint: $(FAILPOINT_ENABLE) - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go $(FAILPOINT_DISABLE) cdc: - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go kafka_consumer: - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./kafka_consumer/main.go + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go install: go install ./... @@ -126,9 +126,9 @@ integration_test_build: check_failpoint_ctl $(FAILPOINT_ENABLE) $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \ -coverpkg=github.com/pingcap/ticdc/... \ - -o bin/cdc.test github.com/pingcap/ticdc \ + -o bin/cdc.test github.com/pingcap/ticdc/cmd/cdc \ || { $(FAILPOINT_DISABLE); exit 1; } - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go \ + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go \ || { $(FAILPOINT_DISABLE); exit 1; } $(FAILPOINT_DISABLE) @@ -193,6 +193,8 @@ data-flow-diagram: docs/data-flow.dot clean: go clean -i ./... rm -rf *.out + rm -f bin/cdc + rm -f bin/cdc_kafka_consumer tools/bin/gofumports: tools/check/go.mod cd tools/check; test -e ../bin/gofumports || \ diff --git a/cdc/capture.go b/cdc/capture.go index e0be0c7466d..b38a71352e4 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -169,6 +169,7 @@ func (c *Capture) Run(ctx context.Context) (err error) { // 1/3 of session ttl that `session.Done` can't be triggered even // the lease is already revoked. if cerror.ErrEtcdSessionDone.Equal(err) { + log.Warn("session is disconnected", zap.Error(err)) return cerror.ErrCaptureSuicide.GenWithStackByArgs() } lease, inErr := c.etcdClient.Client.TimeToLive(ctx, c.session.Lease()) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 41d9e5fec8b..996d1f9e380 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -267,6 +267,7 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea switch { case cerror.ErrEtcdSessionDone.Equal(err), cerror.ErrLeaseExpired.Equal(err): + log.Warn("session is disconnected", zap.Error(err)) return cerror.ErrCaptureSuicide.GenWithStackByArgs() } lease, inErr := ctx.GlobalVars().EtcdClient.Client.TimeToLive(ctx, c.session.Lease()) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index ff3229058a4..958b13be270 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -249,7 +249,7 @@ func (c *changeFeed) updatePartition(tblInfo *timodel.TableInfo, startTs uint64) newPartitionIDs := make([]int64, 0, len(pi.Definitions)) for _, partition := range pi.Definitions { pid := partition.ID - _, ok := c.orphanTables[pid] + _, ok := oldIDs[pid] if !ok { // new partition. c.orphanTables[pid] = startTs diff --git a/cdc/changefeed_test.go b/cdc/changefeed_test.go index 8ab24cc6cda..24b801fbc95 100644 --- a/cdc/changefeed_test.go +++ b/cdc/changefeed_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/check" + timodel "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/etcd" @@ -128,3 +129,39 @@ func (s *changefeedSuite) TestHandleMoveTableJobs(c *check.C) { c.Assert(changefeed.orphanTables, check.HasKey, model.TableID(1)) c.Assert(changefeed.moveTableJobs, check.HasLen, 0) } + +func (s *changefeedSuite) TestUpdatePartition(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + cf := changeFeed{ + partitions: map[model.TableID][]int64{ + 51: {53, 55, 57}, + }, + orphanTables: make(map[model.TableID]model.Ts), + toCleanTables: make(map[model.TableID]model.Ts), + } + tblInfo := &timodel.TableInfo{ + ID: 51, + Partition: &timodel.PartitionInfo{ + Enable: true, + Definitions: []timodel.PartitionDefinition{ + {ID: 57}, {ID: 59}, {ID: 61}, + }, + }, + } + startTs := uint64(100) + + cf.updatePartition(tblInfo, startTs) + c.Assert(cf.orphanTables, check.DeepEquals, map[model.TableID]model.Ts{ + 59: startTs, + 61: startTs, + }) + c.Assert(cf.toCleanTables, check.DeepEquals, map[model.TableID]model.Ts{ + 53: startTs, + 55: startTs, + }) + c.Assert(cf.partitions, check.DeepEquals, map[model.TableID][]int64{ + 51: {57, 59, 61}, + }) +} diff --git a/cdc/kv/etcd_test.go b/cdc/kv/etcd_test.go index 10059635adc..569a7953c3f 100644 --- a/cdc/kv/etcd_test.go +++ b/cdc/kv/etcd_test.go @@ -225,6 +225,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) { ctx := context.Background() detail := &model.ChangeFeedInfo{ SinkURI: "root@tcp(127.0.0.1:3306)/mysql", + SortDir: "/old-version/sorter", } cfID := "test-op-cf" @@ -236,6 +237,7 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) { d, err := s.client.GetChangeFeedInfo(ctx, cfID) c.Assert(err, check.IsNil) c.Assert(d.SinkURI, check.Equals, detail.SinkURI) + c.Assert(d.SortDir, check.Equals, detail.SortDir) err = s.client.LeaseGuardDeleteChangeFeedInfo(ctx, cfID, sess.Lease()) c.Assert(err, check.IsNil) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 1f610fd4a2c..410d1496274 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -52,6 +52,26 @@ const ( StateFinished FeedState = "finished" ) +// ToInt return a int for each `FeedState`, only use this for metrics. +func (s FeedState) ToInt() int { + switch s { + case StateNormal: + return 0 + case StateError: + return 1 + case StateFailed: + return 2 + case StateStopped: + return 3 + case StateFinished: + return 4 + case StateRemoved: + return 5 + } + // -1 for unknown feed state + return -1 +} + const ( // errorHistoryGCInterval represents how long we keep error record in changefeed info errorHistoryGCInterval = time.Minute * 10 @@ -78,7 +98,9 @@ type ChangeFeedInfo struct { AdminJobType AdminJobType `json:"admin-job-type"` Engine SortEngine `json:"sort-engine"` // SortDir is deprecated - SortDir string `json:"-"` + // it cannot be set by user in changefeed level, any assignment to it should be ignored. + // but can be fetched for backward compatibility + SortDir string `json:"sort-dir"` Config *config.ReplicaConfig `json:"config"` State FeedState `json:"state"` diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index c8954febcfe..fb82e75f715 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -120,6 +120,7 @@ func (s *configSuite) TestFillV1(c *check.C) { }, StartTs: 417136892416622595, Engine: "memory", + SortDir: ".", Config: &config.ReplicaConfig{ CaseSensitive: true, Filter: &config.FilterConfig{ diff --git a/cdc/owner.go b/cdc/owner.go index 0dd44f67c4f..11fa6883eff 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -52,7 +52,7 @@ type ownership struct { tickTime time.Duration } -func newOwnersip(tickTime time.Duration) ownership { +func newOwnership(tickTime time.Duration) ownership { minTickTime := 5 * time.Second if tickTime > minTickTime { log.Panic("ownership counter must be incearsed every 5 seconds") @@ -1112,7 +1112,6 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { if err != nil { return errors.Trace(err) } - err = o.dispatchJob(ctx, job) if err != nil { return errors.Trace(err) @@ -1278,7 +1277,7 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error { defer feedChangeReceiver.Stop() o.watchFeedChange(ctx1) - ownership := newOwnersip(tickTime) + ownership := newOwnership(tickTime) loop: for { select { @@ -1580,7 +1579,7 @@ func (o *Owner) watchCapture(ctx context.Context) error { failpoint.Inject("sleep-before-watch-capture", nil) // When an owner just starts, changefeed information is not updated at once. - // Supposing a crased capture should be removed now, the owner will miss deleting + // Supposing a crashed capture should be removed now, the owner will miss deleting // task status and task position if changefeed information is not loaded. // If the task positions and status decode failed, remove them. if err := o.checkAndCleanTasksInfo(ctx); err != nil { diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 0b1ae01d765..4aeb513649b 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -44,6 +44,13 @@ var ( Name: "maintain_table_num", Help: "number of replicated tables maintained in owner", }, []string{"changefeed", "capture", "type"}) + changefeedStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "status", + Help: "The status of changefeeds", + }, []string{"changefeed"}) ) const ( @@ -59,4 +66,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedCheckpointTsLagGauge) registry.MustRegister(ownershipCounter) registry.MustRegister(ownerMaintainTableNumGauge) + registry.MustRegister(changefeedStatusGauge) } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 18b2f51904a..ead1f8d907b 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -235,6 +235,7 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) { o.lastTickTime = now ownerMaintainTableNumGauge.Reset() + changefeedStatusGauge.Reset() for changefeedID, changefeedState := range state.Changefeeds { for captureID, captureInfo := range state.Captures { taskStatus, exist := changefeedState.TaskStatuses[captureID] @@ -243,6 +244,9 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) { } ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables))) ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation))) + if changefeedState.Info != nil { + changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt())) + } } } } diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 9b76941380f..05d49102086 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -158,7 +158,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { if err != nil { log.Warn("Cannot remove temporary file for sorting", zap.String("file", backEnd.fileName), zap.Error(err)) } else { - log.Info("Temporary file removed", zap.String("file", backEnd.fileName)) + log.Debug("Temporary file removed", zap.String("file", backEnd.fileName)) freedCount += 1 } if freedCount >= 16 { @@ -371,7 +371,7 @@ func (p *backEndPool) cleanUpStaleFiles() error { } for _, toRemoveFilePath := range files { - log.Info("Removing stale sorter temporary file", zap.String("file", toRemoveFilePath)) + log.Debug("Removing stale sorter temporary file", zap.String("file", toRemoveFilePath)) err := os.Remove(toRemoveFilePath) if err != nil { // In production, we do not want an error here to interfere with normal operation, diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 5ff7c7b918a..223b9aaa4f1 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -55,7 +55,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch pendingSet := &sync.Map{} defer func() { - log.Info("Unified Sorter: merger exiting, cleaning up resources") + log.Debug("Unified Sorter: merger exiting, cleaning up resources") // cancel pending async IO operations. onExit() cleanUpTask := func(task *flushTask) { diff --git a/cdc/server.go b/cdc/server.go index 40fe9bb9d0b..006b710da13 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -374,10 +374,8 @@ func (s *Server) initDataDir(ctx context.Context) error { return errors.Trace(err) } - if diskInfo.Avail < dataDirThreshold { - log.Warn(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+ - "at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold)) - } + log.Info(fmt.Sprintf("%s is set as data-dir (%dGB available), ticdc recommend disk for data-dir "+ + "at least have %dGB available space", conf.DataDir, diskInfo.Avail, dataDirThreshold)) return nil } @@ -440,14 +438,25 @@ func (s *Server) setUpDataDir(ctx context.Context) error { // at the moment, only consider available disk space func findBestDataDir(candidates []string) (result string, ok bool) { var low uint64 = 0 - for _, dir := range candidates { + + checker := func(dir string) (*util.DiskInfo, error) { + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } if err := util.IsDirReadWritable(dir); err != nil { - log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) - continue + return nil, err } info, err := util.GetDiskInfo(dir) if err != nil { - log.Warn("try to get disk info failed", zap.String("dir", dir), zap.Error(err)) + return nil, err + } + return info, err + } + + for _, dir := range candidates { + info, err := checker(dir) + if err != nil { + log.Warn("check the availability of dir", zap.String("dir", dir), zap.Error(err)) continue } if info.Avail > low { diff --git a/main.go b/cmd/cdc/main.go similarity index 100% rename from main.go rename to cmd/cdc/main.go diff --git a/main_test.go b/cmd/cdc/main_test.go similarity index 100% rename from main_test.go rename to cmd/cdc/main_test.go diff --git a/kafka_consumer/main.go b/cmd/kafka-consumer/main.go similarity index 100% rename from kafka_consumer/main.go rename to cmd/kafka-consumer/main.go diff --git a/errors.toml b/errors.toml index 222c9f4c2d2..592dc6fd6fb 100755 --- a/errors.toml +++ b/errors.toml @@ -636,6 +636,11 @@ error = ''' pulsar send message failed ''' +["CDC:ErrReachMaxTry"] +error = ''' +reach maximum try: %d +''' + ["CDC:ErrReactorFinished"] error = ''' the reactor has done its job and should no longer be executed diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 305f8561890..879c9f75455 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -125,7 +125,11 @@ "gnetId": null, "graphTooltip": 1, "id": null, +<<<<<<< HEAD "iteration": 1620627250839, +======= + "iteration": 1626152035486, +>>>>>>> 3be1df924 (owner, metrics: add changefeed status in metrics and grafana (#2267)) "links": [], "panels": [ { @@ -1701,6 +1705,202 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 26 + }, + "id": 163, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 1, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ticdc_owner_status{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 1, + "legendFormat": "{{changefeed}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "The status of changefeeds", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Percentiles of sink write duration of changefeeds", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 26 + }, + "hiddenSeries": false, + "id": 35, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p95", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p999", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sink write duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "cards": { "cardPadding": 0, @@ -1722,7 +1922,7 @@ "h": 7, "w": 12, "x": 0, - "y": 26 + "y": 33 }, "heatmap": {}, "hideZeroBuckets": true, @@ -1786,17 +1986,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of sink write duration of changefeeds", + "description": "Percentiles of sink batch size", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 26 + "y": 33 }, "hiddenSeries": false, - "id": 35, + "id": 36, "legend": { "alignAsTable": true, "avg": false, @@ -1826,24 +2026,25 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.95, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.90, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p95", + "legendFormat": "{{capture}}-p90", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p99", + "legendFormat": "{{capture}}-p99", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", + "hide": true, "intervalFactor": 1, - "legendFormat": "{{instance}}-p999", + "legendFormat": "{{capture}}-p999", "refId": "C" } ], @@ -1851,7 +2052,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Sink write duration percentile", + "title": "Sink write batch size percentile", "tooltip": { "shared": true, "sort": 0, @@ -1867,7 +2068,7 @@ }, "yaxes": [ { - "format": "s", + "format": "none", "label": null, "logBase": 2, "max": null, @@ -1901,7 +2102,7 @@ "h": 7, "w": 12, "x": 0, - "y": 33 + "y": 40 }, "hiddenSeries": false, "id": 34, @@ -1997,17 +2198,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of sink batch size", + "description": "Percentiles of asynchronous flush sink duration of changefeeds", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 33 + "y": 40 }, "hiddenSeries": false, - "id": 36, + "id": 98, "legend": { "alignAsTable": true, "avg": false, @@ -2037,25 +2238,24 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{capture}}-p90", + "legendFormat": "{{instance}}-{{type}}-p95", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{capture}}-p99", + "legendFormat": "{{instance}}-{{type}}-p99", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", - "hide": true, "intervalFactor": 1, - "legendFormat": "{{capture}}-p999", + "legendFormat": "{{instance}}-{{type}}-p999", "refId": "C" } ], @@ -2063,7 +2263,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Sink write batch size percentile", + "title": "Flush sink duration percentile", "tooltip": { "shared": true, "sort": 0, @@ -2079,7 +2279,7 @@ }, "yaxes": [ { - "format": "none", + "format": "s", "label": null, "logBase": 2, "max": null, @@ -2121,7 +2321,7 @@ "h": 7, "w": 12, "x": 0, - "y": 40 + "y": 47 }, "heatmap": {}, "hideZeroBuckets": true, @@ -2183,25 +2383,27 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of asynchronous flush sink duration of changefeeds", + "description": "Distribution of MySQL worker loads", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 40 + "y": 47 }, "hiddenSeries": false, - "id": 98, + "id": 95, "legend": { "alignAsTable": true, - "avg": false, + "avg": true, "current": true, - "max": false, + "max": true, "min": false, - "rightSide": true, + "rightSide": false, "show": true, + "sort": "current", + "sortDesc": true, "total": false, "values": true }, @@ -2212,46 +2414,77 @@ "options": { "dataLinks": [] }, - "paceLength": 10, "percentage": false, "pointradius": 2, "points": false, "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "stack": false, + "stack": true, "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "sum(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (capture,bucket)", "format": "time_series", + "hide": true, + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p95", + "legendFormat": "{{capture}}-{{bucket}}", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) >= 0)", "format": "time_series", + "hide": true, + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p99", + "legendFormat": "total worker", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 2)", "format": "time_series", + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p999", + "legendFormat": "0-2 row/s worker", "refId": "C" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 2 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 10)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "2-10 row/s worker", + "refId": "D" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 10 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 100)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "10-100 row/s worker", + "refId": "E" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 100)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": ">100 row/s worker", + "refId": "F" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Flush sink duration percentile", + "title": "MySQL sink worker load", "tooltip": { "shared": true, - "sort": 0, + "sort": 2, "value_type": "individual" }, "type": "graph", @@ -2264,9 +2497,9 @@ }, "yaxes": [ { - "format": "s", + "format": "short", "label": null, - "logBase": 2, + "logBase": 1, "max": null, "min": null, "show": true @@ -2304,9 +2537,9 @@ "description": "The duration of detecting and waiting conflict of MySQL sink", "gridPos": { "h": 7, - "w": 8, + "w": 12, "x": 0, - "y": 47 + "y": 54 }, "heatmap": {}, "hideZeroBuckets": true, @@ -2374,9 +2607,9 @@ "fillGradient": 0, "gridPos": { "h": 7, - "w": 8, - "x": 8, - "y": 47 + "w": 12, + "x": 12, + "y": 54 }, "hiddenSeries": false, "id": 83, @@ -2473,147 +2706,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "Distribution of MySQL worker loads", - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 7, - "w": 8, - "x": 16, - "y": 47 - }, - "hiddenSeries": false, - "id": 95, - "legend": { - "alignAsTable": true, - "avg": true, - "current": true, - "max": true, - "min": false, - "rightSide": false, - "show": true, - "sort": "current", - "sortDesc": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "dataLinks": [] - }, - "percentage": false, - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": true, - "steppedLine": false, - "targets": [ - { - "expr": "sum(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (capture,bucket)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{capture}}-{{bucket}}", - "refId": "A" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) >= 0)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "total worker", - "refId": "B" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 2)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "0-2 row/s worker", - "refId": "C" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 2 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 10)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "2-10 row/s worker", - "refId": "D" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 10 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 100)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "10-100 row/s worker", - "refId": "E" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 100)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": ">100 row/s worker", - "refId": "F" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "MySQL sink worker load", - "tooltip": { - "shared": true, - "sort": 2, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -2625,7 +2717,7 @@ "h": 8, "w": 12, "x": 0, - "y": 54 + "y": 61 }, "id": 149, "legend": { @@ -2706,7 +2798,7 @@ "h": 8, "w": 12, "x": 12, - "y": 54 + "y": 61 }, "id": 151, "links": [], @@ -7065,5 +7157,9 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", +<<<<<<< HEAD "version": 16 +======= + "version": 18 +>>>>>>> 3be1df924 (owner, metrics: add changefeed status in metrics and grafana (#2267)) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5a4858eba8d..8d33a056a5a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -230,4 +230,7 @@ var ( // miscellaneous internal errors ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted")) ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota")) + + // retry error + ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry")) ) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 9c519cbfe4d..820f79a53e2 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -89,7 +89,7 @@ func (p *Pipeline) AppendNode(ctx context.Context, name string, node Node) { func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runner) { defer func() { - log.Info("a pipeline node is exiting, stop the whole pipeline", zap.String("name", runner.getName())) + log.Debug("a pipeline node is exiting, stop the whole pipeline", zap.String("name", runner.getName())) p.close() blackhole(previousRunner) p.runnersWg.Done() diff --git a/pkg/retry/options.go b/pkg/retry/options.go new file mode 100644 index 00000000000..724195a3f0c --- /dev/null +++ b/pkg/retry/options.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" +) + +const ( + // defaultBackoffBaseInMs is the initial duration, in Millisecond + defaultBackoffBaseInMs = 10.0 + // defaultBackoffCapInMs is the max amount of duration, in Millisecond + defaultBackoffCapInMs = 100.0 + defaultMaxTries = 3 +) + +// Option ... +type Option func(*retryOptions) + +// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry +type IsRetryable func(error) bool + +type retryOptions struct { + maxTries int64 + backoffBaseInMs float64 + backoffCapInMs float64 + isRetryable IsRetryable +} + +func newRetryOptions() *retryOptions { + return &retryOptions{ + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, + } +} + +// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used +func WithBackoffBaseDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffBaseInMs = float64(delayInMs) + } + } +} + +// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used +func WithBackoffMaxDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffCapInMs = float64(delayInMs) + } + } +} + +// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used +func WithMaxTries(tries int64) Option { + return func(o *retryOptions) { + if tries > 0 { + o.maxTries = tries + } + } +} + +// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled +func WithInfiniteTries() Option { + return func(o *retryOptions) { + o.maxTries = math.MaxInt64 + } +} + +// WithIsRetryableErr configures the error should retry or not, if not set, retry by default +func WithIsRetryableErr(f IsRetryable) Option { + return func(o *retryOptions) { + if f != nil { + o.isRetryable = f + } + } +} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 1783a8ab182..61161cce195 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index 54b3691be9a..96a8e72cd3e 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package retry import ( "context" + "math" "testing" "time" @@ -39,6 +40,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) { err := Run(500*time.Millisecond, 3, f) c.Assert(err, check.ErrorMatches, "test") + // 👇 i think tries = first call + maxRetries, so not weird 😎 + // It's weird that backoff may retry one more time than maxTries. // Because the steps in backoff.Retry is: // 1. Call function @@ -121,3 +124,135 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) { c.Assert(reportedElapsed, check.Greater, time.Second) c.Assert(reportedElapsed, check.LessEqual, 3*time.Second) } + +func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + if callCount == 2 { + return nil + } + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(err, check.IsNil) + c.Assert(callCount, check.Equals, 2) +} + +func (s *runSuite) TestIsRetryable(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.Annotate(context.Canceled, "test") + } + + err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 1) + + callCount = 0 + err = Do(context.Background(), f, WithMaxTries(3)) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + f := func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded) + c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount)) + c.Assert(callCount, check.Less, math.MaxInt64) +} + +func (s *runSuite) TestDoCancelAtBeginning(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) +} + +func (s *runSuite) TestDoCornerCases(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + var i int64 + for i = -10; i < 10; i++ { + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*") + if i > 0 { + c.Assert(int64(callCount), check.Equals, i) + } else { + c.Assert(callCount, check.Equals, defaultMaxTries) + } + } +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go new file mode 100644 index 00000000000..b4af380b582 --- /dev/null +++ b/pkg/retry/retry_with_opt.go @@ -0,0 +1,94 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "time" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +// Operation is the action need to retry +type Operation func() error + +// Do execute the specified function at most maxTries times until it succeeds or got canceled +func Do(ctx context.Context, operation Operation, opts ...Option) error { + retryOption := setOptions(opts...) + return run(ctx, operation, retryOption) +} + +func setOptions(opts ...Option) *retryOptions { + retryOption := newRetryOptions() + for _, opt := range opts { + opt(retryOption) + } + return retryOption +} + +func run(ctx context.Context, op Operation, retryOption *retryOptions) error { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + default: + } + + var t *time.Timer + try := 0 + backOff := time.Duration(0) + for { + err := op() + if err == nil { + return nil + } + + if !retryOption.isRetryable(err) { + return err + } + + try++ + if int64(try) >= retryOption.maxTries { + return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries) + } + + backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) + if t == nil { + t = time.NewTimer(backOff) + defer t.Stop() + } else { + t.Reset(backOff) + } + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-t.C: + } + } +} + +// getBackoffInMs returns the duration to wait before next try +// See https://www.awsarchitectureblog.com/2015/03/backoff.html +func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration { + temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2) + if temp <= 0 { + temp = 1 + } + sleep := temp + rand.Int63n(temp) + backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs) + return time.Duration(backOff) * time.Millisecond +} diff --git a/tools/check/check-tidy.sh b/tools/check/check-tidy.sh index a699e97dddc..cf0a7e6337f 100755 --- a/tools/check/check-tidy.sh +++ b/tools/check/check-tidy.sh @@ -3,8 +3,8 @@ set -euo pipefail GO111MODULE=on go mod tidy -if [ `git --no-pager diff | wc -c` -ne 0 ]; then +if [ "$(git --no-pager diff go.mod go.sum | wc -c)" -ne 0 ]; then echo "Please run \`go mod tidy\` to clean up" - git --no-pager diff + git --no-pager diff go.mod go.sum exit 1 fi