diff --git a/Makefile b/Makefile index 33b0ee26469..3ffecfbe4ac 100644 --- a/Makefile +++ b/Makefile @@ -109,8 +109,9 @@ cdc: kafka_consumer: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go -install: - go install ./... +cdc_test_image: + @which docker || (echo "docker not found in ${PATH}"; exit 1) + docker build --platform linux/amd64 -f deployments/ticdc/docker/test.Dockerfile -t cdc:test ./ unit_test: check_failpoint_ctl generate_mock generate-msgp-code generate-protobuf mkdir -p "$(TEST_DIR)" diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index e3e2abadacf..ee36526e33b 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -490,7 +490,7 @@ func TestRemoveChangefeed(t *testing.T) { info.Config.Consistent = &config.ConsistentConfig{ Level: "eventual", Storage: filepath.Join("nfs://", dir), - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: ctx.ChangefeedVars().ID, diff --git a/cdc/redo/common/metric.go b/cdc/redo/common/metric.go index b2e4cd8e61c..74011bab63b 100644 --- a/cdc/redo/common/metric.go +++ b/cdc/redo/common/metric.go @@ -75,6 +75,15 @@ var ( Help: "The latency distributions of flushLog called by redoManager", Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), }, []string{"namespace", "changefeed"}) + + // RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker. + RedoWorkerBusyRatio = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics in this file @@ -85,4 +94,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(RedoFlushAllDurationHistogram) registry.MustRegister(RedoWriteLogDurationHistogram) registry.MustRegister(RedoFlushLogDurationHistogram) + registry.MustRegister(RedoWorkerBusyRatio) } diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 2d378891413..fd545283849 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -133,8 +133,10 @@ type ManagerImpl struct { flushing int64 lastFlushTime time.Time - metricWriteLogDuration prometheus.Observer - metricFlushLogDuration prometheus.Observer + metricWriteLogDuration prometheus.Observer + metricFlushLogDuration prometheus.Observer + metricTotalRowsCount prometheus.Counter + metricRedoWorkerBusyRatio prometheus.Counter } // NewManager creates a new Manager @@ -170,6 +172,10 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + m.metricTotalRowsCount = common.RedoTotalRowsCountGauge. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + m.metricRedoWorkerBusyRatio = common.RedoWorkerBusyRatio. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) // TODO: better to wait background goroutines after the context is canceled. if m.opts.EnableBgRunner { @@ -193,7 +199,7 @@ func NewMockManager(ctx context.Context) (*ManagerImpl, error) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -359,6 +365,10 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error { DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) common.RedoFlushLogDurationHistogram. DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + common.RedoTotalRowsCountGauge. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + common.RedoWorkerBusyRatio. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) return m.withLock(func(m *ManagerImpl) error { return m.writer.DeleteAllLogs(ctx) }) } @@ -422,7 +432,13 @@ func (m *ManagerImpl) postFlushMeta(metaCheckpoint, metaResolved model.Ts) { m.metaCheckpointTs.setFlushed(metaCheckpoint) } -func (m *ManagerImpl) flushLog(ctx context.Context, handleErr func(err error)) { +func (m *ManagerImpl) flushLog( + ctx context.Context, handleErr func(err error), workTimeSlice *time.Duration, +) { + start := time.Now() + defer func() { + *workTimeSlice += time.Since(start) + }() if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { log.Debug("Fail to update flush flag, " + "the previous flush operation hasn't finished yet") @@ -473,7 +489,8 @@ func (m *ManagerImpl) bgUpdateLog( log.Info("redo manager bgUpdateLog is running", zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID)) + zap.String("changefeed", m.changeFeedID.ID), + zap.Int64("flushIntervalInMs", flushIntervalInMs)) ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) defer func() { @@ -499,14 +516,23 @@ func (m *ManagerImpl) bgUpdateLog( }() var err error + overseerTicker := time.NewTicker(time.Second * 5) + defer overseerTicker.Stop() + var workTimeSlice time.Duration + startToWork := time.Now() for { select { case <-ctx.Done(): return case err = <-logErrCh: + case now := <-overseerTicker.C: + busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000) + m.metricRedoWorkerBusyRatio.Add(float64(busyRatio)) + startToWork = now + workTimeSlice = 0 case <-ticker.C: // interpolate tick message to flush writer if needed - m.flushLog(ctx, handleErr) + m.flushLog(ctx, handleErr, &workTimeSlice) case cache, ok := <-m.logBuffer.Out(): if !ok { return // channel closed @@ -520,6 +546,7 @@ func (m *ManagerImpl) bgUpdateLog( } err = m.writer.WriteLog(ctx, cache.tableID, logs) m.metricWriteLogDuration.Observe(time.Since(start).Seconds()) + m.metricTotalRowsCount.Add(float64(len(logs))) case model.MessageTypeResolved: m.onResolvedTsMsg(cache.tableID, cache.resolvedTs) default: diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index daf19763a8b..383e3095fec 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -100,7 +100,7 @@ func TestLogManagerInProcessor(t *testing.T) { defer logMgr.Cleanup(ctx) checkResolvedTs := func(mgr LogManager, expectedRts uint64) { - time.Sleep(time.Duration(config.MinFlushIntervalInMs+200) * time.Millisecond) + time.Sleep(time.Duration(config.DefaultFlushIntervalInMs+200) * time.Millisecond) resolvedTs := mgr.GetMinResolvedTs() require.Equal(t, expectedRts, resolvedTs) } @@ -322,7 +322,7 @@ func TestManagerError(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -387,7 +387,7 @@ func TestReuseWritter(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "local://" + dir, - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -410,7 +410,8 @@ func TestReuseWritter(t *testing.T) { time.Sleep(time.Duration(100) * time.Millisecond) // The another redo manager shouldn't be influenced. - mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }) + var workTimeSlice time.Duration + mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }, &workTimeSlice) select { case x := <-errCh: log.Panic("shouldn't get an error", zap.Error(x)) diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 2d5a5ec4737..514819e9c2e 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -134,7 +134,7 @@ func selectDownLoadFile( return nil }) if err != nil { - return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err) + return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return files, nil @@ -154,7 +154,7 @@ func downLoadToLocal( eg.Go(func() error { data, err := extStorage.ReadFile(eCtx, f) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } err = os.MkdirAll(dir, redo.DefaultDirMode) diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index f349db16a22..063a8b3e3e0 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -366,7 +366,7 @@ func (w *Writer) close() error { if err != nil { w.file.Close() w.file = nil - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } } @@ -490,7 +490,7 @@ func (w *Writer) GC(checkPointTs uint64) error { errs = multierr.Append(errs, err) } if errs != nil { - errs = cerror.WrapError(cerror.ErrS3StorageAPI, errs) + errs = cerror.WrapError(cerror.ErrExternalStorageAPI, errs) log.Warn("delete redo log in s3 fail", zap.Error(errs)) } }() @@ -610,7 +610,7 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error { // Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name err = w.storage.WriteFile(ctx, filepath.Base(name), fileData) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } // in case the page cache piling up triggered the OS memory reclaming which may cause diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 7a4329d94ab..9e238fdac98 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" @@ -32,7 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" - "github.com/prometheus/client_golang/prometheus" + "github.com/pingcap/tiflow/pkg/uuid" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -139,9 +140,9 @@ type logWriter struct { // the redo log files when changefeed is created or deleted. extStorage storage.ExternalStorage - meta *common.LogMeta - - metricTotalRowsCount prometheus.Gauge + meta *common.LogMeta + preMetaFile string + uuidGenerator uuid.Generator } func newLogWriter( @@ -153,6 +154,16 @@ func newLogWriter( lw = &logWriter{cfg: cfg} + writerOp := &writerOptions{} + for _, opt := range opts { + opt(writerOp) + } + if writerOp.getUUIDGenerator != nil { + lw.uuidGenerator = writerOp.getUUIDGenerator() + } else { + lw.uuidGenerator = uuid.NewGenerator() + } + if lw.cfg.EmitRowEvents { writerCfg := &FileWriterConfig{ FileType: redo.RedoRowLogFileType, @@ -205,15 +216,13 @@ func newLogWriter( } } - lw.metricTotalRowsCount = common.RedoTotalRowsCountGauge. - WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID) return } func (l *logWriter) preCleanUpS3(ctx context.Context) error { ret, err := l.extStorage.FileExists(ctx, l.getDeletedChangefeedMarker()) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } if !ret { return nil @@ -236,7 +245,7 @@ func (l *logWriter) preCleanUpS3(ctx context.Context) error { } err = l.extStorage.DeleteFile(ctx, l.getDeletedChangefeedMarker()) if !isNotExistInS3(err) { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return nil @@ -296,7 +305,7 @@ func (l *logWriter) WriteLog( return nil } - for i, r := range rows { + for _, r := range rows { if r == nil || r.Row == nil { continue } @@ -313,13 +322,11 @@ func (l *logWriter) WriteLog( l.rowWriter.AdvanceTs(r.Row.CommitTs) _, err = l.rowWriter.Write(data) if err != nil { - l.metricTotalRowsCount.Add(float64(i)) return err } redoLogPool.Put(rl) } - l.metricTotalRowsCount.Add(float64(len(rows))) return nil } @@ -451,7 +458,7 @@ func (l *logWriter) getDeletedChangefeedMarker() string { } func (l *logWriter) writeDeletedMarkerToS3(ctx context.Context) error { - return cerror.WrapError(cerror.ErrS3StorageAPI, + return cerror.WrapError(cerror.ErrExternalStorageAPI, l.extStorage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D"))) } @@ -464,7 +471,7 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error { if err != nil { // if fail then retry, may end up with notExit err, ignore the error if !isNotExistInS3(err) { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } } return nil @@ -474,6 +481,7 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error { } func isNotExistInS3(err error) bool { + // TODO: support other storage if err != nil { if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint switch aerr.Code() { @@ -492,7 +500,7 @@ var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error) return nil }) if err != nil { - return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err) + return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return files, nil @@ -537,16 +545,6 @@ func (l *logWriter) isStopped() bool { return rowStopped || ddlStopped } -func (l *logWriter) getMetafileName() string { - if model.DefaultNamespace == l.cfg.ChangeFeedID.Namespace { - return fmt.Sprintf("%s_%s_%s%s", l.cfg.CaptureID, l.cfg.ChangeFeedID.ID, - redo.RedoMetaFileType, redo.MetaEXT) - } - return fmt.Sprintf("%s_%s_%s_%s%s", l.cfg.CaptureID, - l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID, - redo.RedoMetaFileType, redo.MetaEXT) -} - func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, error) { // NOTE: both checkpoint and resolved can regress if a cdc instance restarts. hasChange := false @@ -591,62 +589,67 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error { return nil } - err = os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile")) + if !l.cfg.UseExternalStorage { + return l.flushMetaToLocal(data) } - // we will create a temp metadata file and then atomically rename it. - tmpFileName := l.filePath() + redo.MetaTmpEXT - tmpFile, err := openTruncFile(tmpFileName) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) - } - defer tmpFile.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout) + defer cancel() + return l.flushMetaToS3(ctx, data) +} - _, err = tmpFile.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) - } - err = tmpFile.Sync() - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) +func (l *logWriter) flushMetaToLocal(data []byte) error { + if err := os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode); err != nil { + e := errors.Annotate(err, "can't make dir for new redo logfile") + return cerror.WrapError(cerror.ErrRedoFileOp, e) } - err = os.Rename(tmpFileName, l.filePath()) + metaFile, err := openTruncFile(l.filePath()) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - - dirFile, err := os.Open(l.cfg.Dir) + _, err = metaFile.Write(data) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - defer dirFile.Close() - // sync the dir to guarantee the renamed file is persisted to disk. - err = dirFile.Sync() + err = metaFile.Sync() if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - if !l.cfg.UseExternalStorage { - return nil + if l.preMetaFile != "" { + if err := os.Remove(l.preMetaFile); err != nil && !os.IsNotExist(err) { + return cerror.WrapError(cerror.ErrRedoFileOp, err) + } } + l.preMetaFile = metaFile.Name() - ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout) - defer cancel() - return l.writeMetaToS3(ctx) + return metaFile.Close() } -func (l *logWriter) writeMetaToS3(ctx context.Context) error { - name := l.filePath() - fileData, err := os.ReadFile(name) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) +func (l *logWriter) flushMetaToS3(ctx context.Context, data []byte) error { + start := time.Now() + metaFile := l.getMetafileName() + if err := l.extStorage.WriteFile(ctx, metaFile, data); err != nil { + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } - return cerror.WrapError(cerror.ErrS3StorageAPI, - l.extStorage.WriteFile(ctx, l.getMetafileName(), fileData)) + if l.preMetaFile != "" { + if err := l.extStorage.DeleteFile(ctx, l.preMetaFile); err != nil && !isNotExistInS3(err) { + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) + } + } + l.preMetaFile = metaFile + log.Debug("flush meta to s3", + zap.String("metaFile", metaFile), + zap.Any("cost", time.Since(start).Milliseconds())) + return nil +} + +func (l *logWriter) getMetafileName() string { + return fmt.Sprintf(redo.RedoMetaFileFormat, l.cfg.CaptureID, + l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID, + redo.RedoMetaFileType, l.uuidGenerator.NewString(), redo.MetaEXT) } func (l *logWriter) filePath() string { diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index c3668362385..3263ab9552c 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -135,8 +135,6 @@ func TestLogWriterWriteLog(t *testing.T) { rowWriter: mockWriter, ddlWriter: mockWriter, meta: &common.LogMeta{}, - metricTotalRowsCount: common.RedoTotalRowsCountGauge. - WithLabelValues("default", ""), } if tt.name == "context cancel" { ctx, cancel := context.WithCancel(context.Background()) @@ -320,7 +318,7 @@ func TestLogWriterFlushLog(t *testing.T) { mockStorage := mockstorage.NewMockExternalStorage(controller) if tt.isRunning && tt.name != "context cancel" { mockStorage.EXPECT().WriteFile(gomock.Any(), - "cp_test-cf_meta.meta", + "cp_default_test-cf_meta_uid.meta", gomock.Any()).Return(nil).Times(1) } mockWriter := &mockFileWriter{} @@ -339,11 +337,12 @@ func TestLogWriterFlushLog(t *testing.T) { UseExternalStorage: true, } writer := logWriter{ - cfg: cfg, - rowWriter: mockWriter, - ddlWriter: mockWriter, - meta: &common.LogMeta{}, - extStorage: mockStorage, + cfg: cfg, + uuidGenerator: uuid.NewConstGenerator("uid"), + rowWriter: mockWriter, + ddlWriter: mockWriter, + meta: &common.LogMeta{}, + extStorage: mockStorage, } if tt.name == "context cancel" { @@ -427,7 +426,7 @@ func TestNewLogWriter(t *testing.T) { CaptureID: "cp", MaxLogSize: 10, } - l, err := newLogWriter(ctx, cfg) + l, err := newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen })) require.Nil(t, err) err = l.Close() require.Nil(t, err) @@ -441,7 +440,7 @@ func TestNewLogWriter(t *testing.T) { _, err = f.Write(data) require.Nil(t, err) - l, err = newLogWriter(ctx, cfg) + l, err = newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen })) require.Nil(t, err) err = l.Close() require.Nil(t, err) diff --git a/deployments/ticdc/docker/test.Dockerfile b/deployments/ticdc/docker/test.Dockerfile new file mode 100644 index 00000000000..7c1c07ce717 --- /dev/null +++ b/deployments/ticdc/docker/test.Dockerfile @@ -0,0 +1,6 @@ +FROM alpine:3.15 +RUN apk add --no-cache tzdata bash curl socat +COPY ./bin/cdc /cdc +EXPOSE 8300 +CMD [ "/cdc" ] + diff --git a/errors.toml b/errors.toml index fdc267049c1..2a031dd6784 100755 --- a/errors.toml +++ b/errors.toml @@ -883,7 +883,7 @@ failed to seek to the beginning of request body ["CDC:ErrS3StorageAPI"] error = ''' -s3 storage api +external storage api ''' ["CDC:ErrScanLockFailed"] diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index af9918aca97..9f7e877acd5 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -17426,6 +17426,104 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Redo bgUpdateLog worker busy ratio", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 106 + }, + "hiddenSeries": false, + "id": 723, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)", + "interval": "", + "legendFormat": "{{changefeed}}-{{instance}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Worker Busy Ratio", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "percent", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Redo", diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 4e4c94ca93d..9dced1585e4 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -22,9 +22,10 @@ import ( ) const ( - // MinFlushIntervalInMs is the minimum value of flush interval, which is set - // to two seconds to reduce the frequency of accessing external storage. - MinFlushIntervalInMs = 2000 + // DefaultFlushIntervalInMs is the default flush interval for redo log. + DefaultFlushIntervalInMs = 2000 + // minFlushIntervalInMs is the minimum flush interval for redo log. + minFlushIntervalInMs = 50 ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -41,10 +42,14 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { return nil } - if c.FlushIntervalInMs < MinFlushIntervalInMs { + if c.FlushIntervalInMs == 0 { + c.FlushIntervalInMs = DefaultFlushIntervalInMs + } + + if c.FlushIntervalInMs < minFlushIntervalInMs { return cerror.ErrInvalidReplicaConfig.FastGenByArgs( fmt.Sprintf("The consistent.flush-interval:%d must be equal or greater than %d", - c.FlushIntervalInMs, MinFlushIntervalInMs)) + c.FlushIntervalInMs, minFlushIntervalInMs)) } uri, err := storage.ParseRawURL(c.Storage) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index aca7d9dbb23..3bbcdef90cc 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -45,7 +45,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: 64, - FlushIntervalInMs: MinFlushIntervalInMs, + FlushIntervalInMs: DefaultFlushIntervalInMs, Storage: "", }, } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 283f3b947fb..29ddc5182ef 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -287,8 +287,8 @@ var ( "rawData size %d exceeds maximum file size %d", errors.RFCCodeText("CDC:ErrFileSizeExceed"), ) - ErrS3StorageAPI = errors.Normalize( - "s3 storage api", + ErrExternalStorageAPI = errors.Normalize( + "external storage api", errors.RFCCodeText("CDC:ErrS3StorageAPI"), ) ErrFailToCreateExternalStorage = errors.Normalize( diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 8016b12f495..c57f3672f35 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -39,8 +39,6 @@ const ( LogEXT = ".log" // MetaEXT is the meta file ext of meta file after safely wrote to disk MetaEXT = ".meta" - // MetaTmpEXT is the meta file ext of meta file before safely wrote to disk - MetaTmpEXT = ".mtmp" // SortLogEXT is the sorted log file ext of log file after safely wrote to disk SortLogEXT = ".sort" @@ -202,6 +200,9 @@ const ( // RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information // layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s" + // RedoMetaFileFormat is the format of redo meta file, which contains namespace information. + // layout: captureID_namespace_changefeedID_fileType_uuid.fileExtName + RedoMetaFileFormat = "%s_%s_%s_%s_%s%s" ) // logFormat2ParseFormat converts redo log file name format to the space separated