From 1ce0a3415b91e3ac78b77d4ef5d2207324d2adbf Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 9 Feb 2023 18:04:00 +0800 Subject: [PATCH] redo(ticdc): use uuid in redo meta filename (#8075) (#8130) close pingcap/tiflow#8028, close pingcap/tiflow#8074 --- Makefile | 4 + cdc/api/v2/model.go | 2 +- cdc/owner/changefeed_test.go | 2 +- cdc/redo/common/metric.go | 10 ++ cdc/redo/manager.go | 62 +++++++-- cdc/redo/manager_test.go | 9 +- cdc/redo/reader/file.go | 4 +- cdc/redo/writer/file.go | 6 +- cdc/redo/writer/writer.go | 121 +++++++++--------- cdc/redo/writer/writer_test.go | 19 ++- cdc/sinkv2/eventsink/txn/worker.go | 6 +- deployments/ticdc/docker/test.Dockerfile | 6 + .../internal/s3/file_manager.go | 16 +-- errors.toml | 2 +- metrics/grafana/ticdc.json | 98 ++++++++++++++ pkg/config/consistent.go | 15 ++- pkg/config/replica_config.go | 2 +- pkg/errors/cdc_errors.go | 4 +- pkg/redo/config.go | 5 +- 19 files changed, 280 insertions(+), 113 deletions(-) create mode 100644 deployments/ticdc/docker/test.Dockerfile diff --git a/Makefile b/Makefile index 85aa8b288f3..aa0c7de75ef 100644 --- a/Makefile +++ b/Makefile @@ -144,6 +144,10 @@ kafka_consumer: storage_consumer: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go +cdc_test_image: + @which docker || (echo "docker not found in ${PATH}"; exit 1) + docker build --platform linux/amd64 -f deployments/ticdc/docker/test.Dockerfile -t cdc:test ./ + install: go install ./... diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 49e910fc526..9a469f1ee70 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -341,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig { Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: 64, - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, Storage: "", }, } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index aaeec0f79f6..d71b5f0cc1c 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -487,7 +487,7 @@ func TestRemoveChangefeed(t *testing.T) { info.Config.Consistent = &config.ConsistentConfig{ Level: "eventual", Storage: filepath.Join("nfs://", dir), - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: ctx.ChangefeedVars().ID, diff --git a/cdc/redo/common/metric.go b/cdc/redo/common/metric.go index b2e4cd8e61c..74011bab63b 100644 --- a/cdc/redo/common/metric.go +++ b/cdc/redo/common/metric.go @@ -75,6 +75,15 @@ var ( Help: "The latency distributions of flushLog called by redoManager", Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13), }, []string{"namespace", "changefeed"}) + + // RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker. + RedoWorkerBusyRatio = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics in this file @@ -85,4 +94,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(RedoFlushAllDurationHistogram) registry.MustRegister(RedoWriteLogDurationHistogram) registry.MustRegister(RedoFlushLogDurationHistogram) + registry.MustRegister(RedoWorkerBusyRatio) } diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index bf39b6bf800..98d7aeecffe 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -137,8 +137,10 @@ type ManagerImpl struct { flushing int64 lastFlushTime time.Time - metricWriteLogDuration prometheus.Observer - metricFlushLogDuration prometheus.Observer + metricWriteLogDuration prometheus.Observer + metricFlushLogDuration prometheus.Observer + metricTotalRowsCount prometheus.Counter + metricRedoWorkerBusyRatio prometheus.Counter } // NewManager creates a new Manager @@ -174,6 +176,10 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + m.metricTotalRowsCount = common.RedoTotalRowsCountGauge. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + m.metricRedoWorkerBusyRatio = common.RedoWorkerBusyRatio. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) // TODO: better to wait background goroutines after the context is canceled. if m.opts.EnableBgRunner { @@ -197,7 +203,7 @@ func NewMockManager(ctx context.Context) (*ManagerImpl, error) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -365,6 +371,10 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error { DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) common.RedoFlushLogDurationHistogram. DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + common.RedoTotalRowsCountGauge. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + common.RedoWorkerBusyRatio. + DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) return m.withLock(func(m *ManagerImpl) error { return m.writer.DeleteAllLogs(ctx) }) } @@ -428,7 +438,13 @@ func (m *ManagerImpl) postFlushMeta(metaCheckpoint, metaResolved model.Ts) { m.metaCheckpointTs.setFlushed(metaCheckpoint) } -func (m *ManagerImpl) flushLog(ctx context.Context, handleErr func(err error)) { +func (m *ManagerImpl) flushLog( + ctx context.Context, handleErr func(err error), workTimeSlice *time.Duration, +) { + start := time.Now() + defer func() { + *workTimeSlice += time.Since(start) + }() if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { log.Debug("Fail to update flush flag, " + "the previous flush operation hasn't finished yet") @@ -481,7 +497,8 @@ func (m *ManagerImpl) bgUpdateLog( log.Info("redo manager bgUpdateLog is running", zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID)) + zap.String("changefeed", m.changeFeedID.ID), + zap.Int64("flushIntervalInMs", flushIntervalInMs)) ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) defer func() { @@ -511,7 +528,11 @@ func (m *ManagerImpl) bgUpdateLog( rtsMap := make(map[model.TableID]model.Ts) releaseMemoryCbs := make([]func(), 0, 1024) - emitBatch := func() { + emitBatch := func(workTimeSlice *time.Duration) { + start := time.Now() + defer func() { + *workTimeSlice += time.Since(start) + }() if len(logs) > 0 { start := time.Now() err = m.writer.WriteLog(ctx, logs) @@ -522,6 +543,7 @@ func (m *ManagerImpl) bgUpdateLog( zap.Int("rows", len(logs)), zap.Error(err), zap.Duration("writeLogElapse", writeLogElapse)) + m.metricTotalRowsCount.Add(float64(len(logs))) m.metricWriteLogDuration.Observe(writeLogElapse.Seconds()) for _, releaseMemory := range releaseMemoryCbs { @@ -552,18 +574,23 @@ func (m *ManagerImpl) bgUpdateLog( } } + overseerTicker := time.NewTicker(time.Second * 5) + defer overseerTicker.Stop() + var workTimeSlice time.Duration + startToWork := time.Now() for { if len(logs) > 0 || len(rtsMap) > 0 { select { case <-ctx.Done(): return case <-ticker.C: - emitBatch() - m.flushLog(ctx, handleErr) + emitBatch(&workTimeSlice) + m.flushLog(ctx, handleErr, &workTimeSlice) case cache, ok := <-m.logBuffer.Out(): if !ok { return // channel closed } + startToHandleEvent := time.Now() switch cache.eventType { case model.MessageTypeRow: for _, row := range cache.rows { @@ -579,21 +606,28 @@ func (m *ManagerImpl) bgUpdateLog( default: log.Panic("redo manager receives unknown event type") } + workTimeSlice += time.Since(startToHandleEvent) + case now := <-overseerTicker.C: + busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000) + m.metricRedoWorkerBusyRatio.Add(float64(busyRatio)) + startToWork = now + workTimeSlice = 0 case err = <-logErrCh: default: - emitBatch() + emitBatch(&workTimeSlice) } } else { select { case <-ctx.Done(): return case <-ticker.C: - emitBatch() - m.flushLog(ctx, handleErr) + emitBatch(&workTimeSlice) + m.flushLog(ctx, handleErr, &workTimeSlice) case cache, ok := <-m.logBuffer.Out(): if !ok { return // channel closed } + startToHandleEvent := time.Now() switch cache.eventType { case model.MessageTypeRow: for _, row := range cache.rows { @@ -609,6 +643,12 @@ func (m *ManagerImpl) bgUpdateLog( default: log.Panic("redo manager receives unknown event type") } + workTimeSlice += time.Since(startToHandleEvent) + case now := <-overseerTicker.C: + busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000) + m.metricRedoWorkerBusyRatio.Add(float64(busyRatio)) + startToWork = now + workTimeSlice = 0 case err = <-logErrCh: } } diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index ad3d19deec6..0c0c218c59c 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -100,7 +100,7 @@ func TestLogManagerInProcessor(t *testing.T) { defer logMgr.Cleanup(ctx) checkResolvedTs := func(mgr LogManager, expectedRts uint64) { - time.Sleep(time.Duration(config.MinFlushIntervalInMs+200) * time.Millisecond) + time.Sleep(time.Duration(config.DefaultFlushIntervalInMs+200) * time.Millisecond) resolvedTs := mgr.GetMinResolvedTs() require.Equal(t, expectedRts, resolvedTs) } @@ -322,7 +322,7 @@ func TestManagerError(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "blackhole://", - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -387,7 +387,7 @@ func TestReuseWritter(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), Storage: "local://" + dir, - FlushIntervalInMs: config.MinFlushIntervalInMs, + FlushIntervalInMs: config.DefaultFlushIntervalInMs, } errCh := make(chan error, 1) @@ -410,7 +410,8 @@ func TestReuseWritter(t *testing.T) { time.Sleep(time.Duration(100) * time.Millisecond) // The another redo manager shouldn't be influenced. - mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }) + var workTimeSlice time.Duration + mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }, &workTimeSlice) select { case x := <-errCh: log.Panic("shouldn't get an error", zap.Error(x)) diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index fa184129bd4..491130cd00e 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -133,7 +133,7 @@ func selectDownLoadFile( return nil }) if err != nil { - return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err) + return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return files, nil @@ -153,7 +153,7 @@ func downLoadToLocal( eg.Go(func() error { data, err := extStorage.ReadFile(eCtx, f) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } err = os.MkdirAll(dir, redo.DefaultDirMode) diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index c2b74318d48..78dd18b9b94 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -365,7 +365,7 @@ func (w *Writer) close() error { if err != nil { w.file.Close() w.file = nil - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } } @@ -489,7 +489,7 @@ func (w *Writer) GC(checkPointTs uint64) error { errs = multierr.Append(errs, err) } if errs != nil { - errs = cerror.WrapError(cerror.ErrS3StorageAPI, errs) + errs = cerror.WrapError(cerror.ErrExternalStorageAPI, errs) log.Warn("delete redo log in s3 fail", zap.Error(errs)) } }() @@ -616,7 +616,7 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error { // Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name err = w.storage.WriteFile(ctx, filepath.Base(name), fileData) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } // in case the page cache piling up triggered the OS memory reclaming which may cause diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 28fa5fc29f1..9e2edbb8656 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -19,6 +19,7 @@ import ( "net/url" "os" "path/filepath" + "time" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" @@ -31,7 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" - "github.com/prometheus/client_golang/prometheus" + "github.com/pingcap/tiflow/pkg/uuid" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -132,9 +133,9 @@ type logWriter struct { // the redo log files when changefeed is created or deleted. extStorage storage.ExternalStorage - meta *common.LogMeta - - metricTotalRowsCount prometheus.Gauge + meta *common.LogMeta + preMetaFile string + uuidGenerator uuid.Generator } func newLogWriter( @@ -146,6 +147,16 @@ func newLogWriter( lw = &logWriter{cfg: cfg} + writerOp := &writerOptions{} + for _, opt := range opts { + opt(writerOp) + } + if writerOp.getUUIDGenerator != nil { + lw.uuidGenerator = writerOp.getUUIDGenerator() + } else { + lw.uuidGenerator = uuid.NewGenerator() + } + if lw.cfg.EmitRowEvents { writerCfg := &FileWriterConfig{ FileType: redo.RedoRowLogFileType, @@ -198,15 +209,13 @@ func newLogWriter( } } - lw.metricTotalRowsCount = common.RedoTotalRowsCountGauge. - WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID) return } func (l *logWriter) preCleanUpS3(ctx context.Context) error { ret, err := l.extStorage.FileExists(ctx, l.getDeletedChangefeedMarker()) if err != nil { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } if !ret { return nil @@ -229,7 +238,7 @@ func (l *logWriter) preCleanUpS3(ctx context.Context) error { } err = l.extStorage.DeleteFile(ctx, l.getDeletedChangefeedMarker()) if !isNotExistInS3(err) { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return nil @@ -287,7 +296,7 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv return nil } - for i, r := range rows { + for _, r := range rows { if r == nil || r.Row == nil { continue } @@ -301,11 +310,9 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv l.rowWriter.AdvanceTs(r.Row.CommitTs) _, err = l.rowWriter.Write(data) if err != nil { - l.metricTotalRowsCount.Add(float64(i)) return err } } - l.metricTotalRowsCount.Add(float64(len(rows))) return nil } @@ -432,7 +439,7 @@ func (l *logWriter) getDeletedChangefeedMarker() string { } func (l *logWriter) writeDeletedMarkerToS3(ctx context.Context) error { - return cerror.WrapError(cerror.ErrS3StorageAPI, + return cerror.WrapError(cerror.ErrExternalStorageAPI, l.extStorage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D"))) } @@ -445,7 +452,7 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error { if err != nil { // if fail then retry, may end up with notExit err, ignore the error if !isNotExistInS3(err) { - return cerror.WrapError(cerror.ErrS3StorageAPI, err) + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } } return nil @@ -455,6 +462,7 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error { } func isNotExistInS3(err error) bool { + // TODO: support other storage if err != nil { if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint switch aerr.Code() { @@ -473,7 +481,7 @@ var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error) return nil }) if err != nil { - return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err) + return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err) } return files, nil @@ -518,16 +526,6 @@ func (l *logWriter) isStopped() bool { return rowStopped || ddlStopped } -func (l *logWriter) getMetafileName() string { - if model.DefaultNamespace == l.cfg.ChangeFeedID.Namespace { - return fmt.Sprintf("%s_%s_%s%s", l.cfg.CaptureID, l.cfg.ChangeFeedID.ID, - redo.RedoMetaFileType, redo.MetaEXT) - } - return fmt.Sprintf("%s_%s_%s_%s%s", l.cfg.CaptureID, - l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID, - redo.RedoMetaFileType, redo.MetaEXT) -} - func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, error) { // NOTE: both checkpoint and resolved can regress if a cdc instance restarts. hasChange := false @@ -572,62 +570,67 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error { return nil } - err = os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile")) + if !l.cfg.UseExternalStorage { + return l.flushMetaToLocal(data) } - // we will create a temp metadata file and then atomically rename it. - tmpFileName := l.filePath() + redo.MetaTmpEXT - tmpFile, err := openTruncFile(tmpFileName) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) - } - defer tmpFile.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout) + defer cancel() + return l.flushMetaToS3(ctx, data) +} - _, err = tmpFile.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) - } - err = tmpFile.Sync() - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) +func (l *logWriter) flushMetaToLocal(data []byte) error { + if err := os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode); err != nil { + e := errors.Annotate(err, "can't make dir for new redo logfile") + return cerror.WrapError(cerror.ErrRedoFileOp, e) } - err = os.Rename(tmpFileName, l.filePath()) + metaFile, err := openTruncFile(l.filePath()) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - - dirFile, err := os.Open(l.cfg.Dir) + _, err = metaFile.Write(data) if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - defer dirFile.Close() - // sync the dir to guarantee the renamed file is persisted to disk. - err = dirFile.Sync() + err = metaFile.Sync() if err != nil { return cerror.WrapError(cerror.ErrRedoFileOp, err) } - if !l.cfg.UseExternalStorage { - return nil + if l.preMetaFile != "" { + if err := os.Remove(l.preMetaFile); err != nil && !os.IsNotExist(err) { + return cerror.WrapError(cerror.ErrRedoFileOp, err) + } } + l.preMetaFile = metaFile.Name() - ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout) - defer cancel() - return l.writeMetaToS3(ctx) + return metaFile.Close() } -func (l *logWriter) writeMetaToS3(ctx context.Context) error { - name := l.filePath() - fileData, err := os.ReadFile(name) - if err != nil { - return cerror.WrapError(cerror.ErrRedoFileOp, err) +func (l *logWriter) flushMetaToS3(ctx context.Context, data []byte) error { + start := time.Now() + metaFile := l.getMetafileName() + if err := l.extStorage.WriteFile(ctx, metaFile, data); err != nil { + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) } - return cerror.WrapError(cerror.ErrS3StorageAPI, - l.extStorage.WriteFile(ctx, l.getMetafileName(), fileData)) + if l.preMetaFile != "" { + if err := l.extStorage.DeleteFile(ctx, l.preMetaFile); err != nil && !isNotExistInS3(err) { + return cerror.WrapError(cerror.ErrExternalStorageAPI, err) + } + } + l.preMetaFile = metaFile + log.Debug("flush meta to s3", + zap.String("metaFile", metaFile), + zap.Any("cost", time.Since(start).Milliseconds())) + return nil +} + +func (l *logWriter) getMetafileName() string { + return fmt.Sprintf(redo.RedoMetaFileFormat, l.cfg.CaptureID, + l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID, + redo.RedoMetaFileType, l.uuidGenerator.NewString(), redo.MetaEXT) } func (l *logWriter) filePath() string { diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index 329a9c39f8d..b6a6684ddb1 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -129,8 +129,6 @@ func TestLogWriterWriteLog(t *testing.T) { rowWriter: mockWriter, ddlWriter: mockWriter, meta: &common.LogMeta{}, - metricTotalRowsCount: common.RedoTotalRowsCountGauge. - WithLabelValues("default", ""), } if tt.name == "context cancel" { ctx, cancel := context.WithCancel(context.Background()) @@ -314,7 +312,7 @@ func TestLogWriterFlushLog(t *testing.T) { mockStorage := mockstorage.NewMockExternalStorage(controller) if tt.isRunning && tt.name != "context cancel" { mockStorage.EXPECT().WriteFile(gomock.Any(), - "cp_test-cf_meta.meta", + "cp_default_test-cf_meta_uid.meta", gomock.Any()).Return(nil).Times(1) } mockWriter := &mockFileWriter{} @@ -333,11 +331,12 @@ func TestLogWriterFlushLog(t *testing.T) { UseExternalStorage: true, } writer := logWriter{ - cfg: cfg, - rowWriter: mockWriter, - ddlWriter: mockWriter, - meta: &common.LogMeta{}, - extStorage: mockStorage, + cfg: cfg, + uuidGenerator: uuid.NewConstGenerator("uid"), + rowWriter: mockWriter, + ddlWriter: mockWriter, + meta: &common.LogMeta{}, + extStorage: mockStorage, } if tt.name == "context cancel" { @@ -421,7 +420,7 @@ func TestNewLogWriter(t *testing.T) { CaptureID: "cp", MaxLogSize: 10, } - l, err := newLogWriter(ctx, cfg) + l, err := newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen })) require.Nil(t, err) err = l.Close() require.Nil(t, err) @@ -435,7 +434,7 @@ func TestNewLogWriter(t *testing.T) { _, err = f.Write(data) require.Nil(t, err) - l, err = newLogWriter(ctx, cfg) + l, err = newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen })) require.Nil(t, err) err = l.Close() require.Nil(t, err) diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index 99c0eb99b54..b7a5397303f 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -137,8 +137,8 @@ func (w *worker) runBackgroundLoop() { defer ticker.Stop() var flushTimeSlice, totalTimeSlice time.Duration - overseerTimer := time.NewTicker(time.Second) - defer overseerTimer.Stop() + overseerTicker := time.NewTicker(time.Second) + defer overseerTicker.Stop() startToWork := time.Now() Loop: for { @@ -163,7 +163,7 @@ func (w *worker) runBackgroundLoop() { if w.doFlush(&flushTimeSlice) { break Loop } - case now := <-overseerTimer.C: + case now := <-overseerTicker.C: totalTimeSlice = now.Sub(startToWork) busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000) w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount)) diff --git a/deployments/ticdc/docker/test.Dockerfile b/deployments/ticdc/docker/test.Dockerfile new file mode 100644 index 00000000000..7c1c07ce717 --- /dev/null +++ b/deployments/ticdc/docker/test.Dockerfile @@ -0,0 +1,6 @@ +FROM alpine:3.15 +RUN apk add --no-cache tzdata bash curl socat +COPY ./bin/cdc /cdc +EXPOSE 8300 +CMD [ "/cdc" ] + diff --git a/engine/pkg/externalresource/internal/s3/file_manager.go b/engine/pkg/externalresource/internal/s3/file_manager.go index 8a07153af55..89a0647d757 100644 --- a/engine/pkg/externalresource/internal/s3/file_manager.go +++ b/engine/pkg/externalresource/internal/s3/file_manager.go @@ -119,7 +119,7 @@ func (m *FileManager) GetPersistedResource( ok, err := storage.FileExists(ctx, placeholderFileName) if err != nil { - return nil, errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("check placeholder file") + return nil, errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("check placeholder file") } if !ok { return nil, errors.ErrResourceFilesNotFound.GenWithStackByArgs() @@ -293,7 +293,7 @@ func (m *FileManager) removeFilesIf( return nil }) if err != nil { - return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("RemoveTemporaryFiles") + return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("RemoveTemporaryFiles") } log.Info("Removing resources", @@ -303,7 +303,7 @@ func (m *FileManager) removeFilesIf( for _, path := range toRemoveFiles { if err := storage.DeleteFile(ctx, path); err != nil { - return errors.ErrS3StorageAPI.Wrap(err) + return errors.ErrExternalStorageAPI.Wrap(err) } } return nil @@ -312,25 +312,25 @@ func (m *FileManager) removeFilesIf( func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorage) error { exists, err := storage.FileExists(ctx, placeholderFileName) if err != nil { - return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("checking placeholder file") + return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("checking placeholder file") } if exists { // This should not happen in production. Unless the caller of the FileManager has a bug. - return errors.ErrS3StorageAPI.GenWithStackByArgs("resource already exists") + return errors.ErrExternalStorageAPI.GenWithStackByArgs("resource already exists") } writer, err := storage.Create(ctx, placeholderFileName) if err != nil { - return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file") + return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file") } _, err = writer.Write(ctx, []byte("placeholder")) if err != nil { - return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("writing placeholder file") + return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("writing placeholder file") } if err := writer.Close(ctx); err != nil { - return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("closing placeholder file") + return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("closing placeholder file") } return nil } diff --git a/errors.toml b/errors.toml index 252852d595b..b83d3f13b98 100755 --- a/errors.toml +++ b/errors.toml @@ -998,7 +998,7 @@ failed to seek to the beginning of request body ["CDC:ErrS3StorageAPI"] error = ''' -s3 storage api +external storage api ''' ["CDC:ErrScanLockFailed"] diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index c3d8c911cde..42fc7e0630c 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -20770,6 +20770,104 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Redo bgUpdateLog worker busy ratio", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 106 + }, + "hiddenSeries": false, + "id": 723, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)", + "interval": "", + "legendFormat": "{{changefeed}}-{{instance}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Worker Busy Ratio", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "percent", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Redo", diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 4e4c94ca93d..9dced1585e4 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -22,9 +22,10 @@ import ( ) const ( - // MinFlushIntervalInMs is the minimum value of flush interval, which is set - // to two seconds to reduce the frequency of accessing external storage. - MinFlushIntervalInMs = 2000 + // DefaultFlushIntervalInMs is the default flush interval for redo log. + DefaultFlushIntervalInMs = 2000 + // minFlushIntervalInMs is the minimum flush interval for redo log. + minFlushIntervalInMs = 50 ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -41,10 +42,14 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { return nil } - if c.FlushIntervalInMs < MinFlushIntervalInMs { + if c.FlushIntervalInMs == 0 { + c.FlushIntervalInMs = DefaultFlushIntervalInMs + } + + if c.FlushIntervalInMs < minFlushIntervalInMs { return cerror.ErrInvalidReplicaConfig.FastGenByArgs( fmt.Sprintf("The consistent.flush-interval:%d must be equal or greater than %d", - c.FlushIntervalInMs, MinFlushIntervalInMs)) + c.FlushIntervalInMs, minFlushIntervalInMs)) } uri, err := storage.ParseRawURL(c.Storage) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 9c34562b15b..d70c478f52f 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -62,7 +62,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: 64, - FlushIntervalInMs: MinFlushIntervalInMs, + FlushIntervalInMs: DefaultFlushIntervalInMs, Storage: "", }, } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index bd5fd9eac46..fb9b2f63b68 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -299,8 +299,8 @@ var ( "rawData size %d exceeds maximum file size %d", errors.RFCCodeText("CDC:ErrFileSizeExceed"), ) - ErrS3StorageAPI = errors.Normalize( - "s3 storage api", + ErrExternalStorageAPI = errors.Normalize( + "external storage api", errors.RFCCodeText("CDC:ErrS3StorageAPI"), ) ErrStorageInitialize = errors.Normalize( diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 8016b12f495..c57f3672f35 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -39,8 +39,6 @@ const ( LogEXT = ".log" // MetaEXT is the meta file ext of meta file after safely wrote to disk MetaEXT = ".meta" - // MetaTmpEXT is the meta file ext of meta file before safely wrote to disk - MetaTmpEXT = ".mtmp" // SortLogEXT is the sorted log file ext of log file after safely wrote to disk SortLogEXT = ".sort" @@ -202,6 +200,9 @@ const ( // RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information // layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s" + // RedoMetaFileFormat is the format of redo meta file, which contains namespace information. + // layout: captureID_namespace_changefeedID_fileType_uuid.fileExtName + RedoMetaFileFormat = "%s_%s_%s_%s_%s%s" ) // logFormat2ParseFormat converts redo log file name format to the space separated