Skip to content

Commit

Permalink
redo(ticdc): use uuid in redo meta filename (pingcap#8075)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 8, 2023
1 parent d418b6d commit e4100f9
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 98 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ cdc:
kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go

install:
go install ./...
cdc_test_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
docker build --platform linux/amd64 -f deployments/ticdc/docker/test.Dockerfile -t cdc:test ./

unit_test: check_failpoint_ctl generate_mock generate-msgp-code generate-protobuf
mkdir -p "$(TEST_DIR)"
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestRemoveChangefeed(t *testing.T) {
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
10 changes: 10 additions & 0 deletions cdc/redo/common/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ var (
Help: "The latency distributions of flushLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})

// RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker.
RedoWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand All @@ -85,4 +94,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
registry.MustRegister(RedoWorkerBusyRatio)
}
39 changes: 33 additions & 6 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ type ManagerImpl struct {
flushing int64
lastFlushTime time.Time

metricWriteLogDuration prometheus.Observer
metricFlushLogDuration prometheus.Observer
metricWriteLogDuration prometheus.Observer
metricFlushLogDuration prometheus.Observer
metricTotalRowsCount prometheus.Counter
metricRedoWorkerBusyRatio prometheus.Counter
}

// NewManager creates a new Manager
Expand Down Expand Up @@ -170,6 +172,10 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricTotalRowsCount = common.RedoTotalRowsCountGauge.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricRedoWorkerBusyRatio = common.RedoWorkerBusyRatio.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)

// TODO: better to wait background goroutines after the context is canceled.
if m.opts.EnableBgRunner {
Expand All @@ -193,7 +199,7 @@ func NewMockManager(ctx context.Context) (*ManagerImpl, error) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand Down Expand Up @@ -359,6 +365,10 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error {
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoFlushLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoTotalRowsCountGauge.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoWorkerBusyRatio.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
return m.withLock(func(m *ManagerImpl) error { return m.writer.DeleteAllLogs(ctx) })
}

Expand Down Expand Up @@ -422,7 +432,13 @@ func (m *ManagerImpl) postFlushMeta(metaCheckpoint, metaResolved model.Ts) {
m.metaCheckpointTs.setFlushed(metaCheckpoint)
}

func (m *ManagerImpl) flushLog(ctx context.Context, handleErr func(err error)) {
func (m *ManagerImpl) flushLog(
ctx context.Context, handleErr func(err error), workTimeSlice *time.Duration,
) {
start := time.Now()
defer func() {
*workTimeSlice += time.Since(start)
}()
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
log.Debug("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
Expand Down Expand Up @@ -473,7 +489,8 @@ func (m *ManagerImpl) bgUpdateLog(

log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID))
zap.String("changefeed", m.changeFeedID.ID),
zap.Int64("flushIntervalInMs", flushIntervalInMs))

ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
defer func() {
Expand All @@ -499,14 +516,23 @@ func (m *ManagerImpl) bgUpdateLog(
}()

var err error
overseerTicker := time.NewTicker(time.Second * 5)
defer overseerTicker.Stop()
var workTimeSlice time.Duration
startToWork := time.Now()
for {
select {
case <-ctx.Done():
return
case err = <-logErrCh:
case now := <-overseerTicker.C:
busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
startToWork = now
workTimeSlice = 0
case <-ticker.C:
// interpolate tick message to flush writer if needed
m.flushLog(ctx, handleErr)
m.flushLog(ctx, handleErr, &workTimeSlice)
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
Expand All @@ -520,6 +546,7 @@ func (m *ManagerImpl) bgUpdateLog(
}
err = m.writer.WriteLog(ctx, cache.tableID, logs)
m.metricWriteLogDuration.Observe(time.Since(start).Seconds())
m.metricTotalRowsCount.Add(float64(len(logs)))
case model.MessageTypeResolved:
m.onResolvedTsMsg(cache.tableID, cache.resolvedTs)
default:
Expand Down
9 changes: 5 additions & 4 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestLogManagerInProcessor(t *testing.T) {
defer logMgr.Cleanup(ctx)

checkResolvedTs := func(mgr LogManager, expectedRts uint64) {
time.Sleep(time.Duration(config.MinFlushIntervalInMs+200) * time.Millisecond)
time.Sleep(time.Duration(config.DefaultFlushIntervalInMs+200) * time.Millisecond)
resolvedTs := mgr.GetMinResolvedTs()
require.Equal(t, expectedRts, resolvedTs)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestReuseWritter(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "local://" + dir,
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand All @@ -410,7 +410,8 @@ func TestReuseWritter(t *testing.T) {
time.Sleep(time.Duration(100) * time.Millisecond)

// The another redo manager shouldn't be influenced.
mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err })
var workTimeSlice time.Duration
mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }, &workTimeSlice)
select {
case x := <-errCh:
log.Panic("shouldn't get an error", zap.Error(x))
Expand Down
4 changes: 2 additions & 2 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func selectDownLoadFile(
return nil
})
if err != nil {
return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err)
return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

return files, nil
Expand All @@ -154,7 +154,7 @@ func downLoadToLocal(
eg.Go(func() error {
data, err := extStorage.ReadFile(eCtx, f)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

err = os.MkdirAll(dir, redo.DefaultDirMode)
Expand Down
6 changes: 3 additions & 3 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (w *Writer) close() error {
if err != nil {
w.file.Close()
w.file = nil
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
}

Expand Down Expand Up @@ -490,7 +490,7 @@ func (w *Writer) GC(checkPointTs uint64) error {
errs = multierr.Append(errs, err)
}
if errs != nil {
errs = cerror.WrapError(cerror.ErrS3StorageAPI, errs)
errs = cerror.WrapError(cerror.ErrExternalStorageAPI, errs)
log.Warn("delete redo log in s3 fail", zap.Error(errs))
}
}()
Expand Down Expand Up @@ -610,7 +610,7 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error {
// Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name
err = w.storage.WriteFile(ctx, filepath.Base(name), fileData)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

// in case the page cache piling up triggered the OS memory reclaming which may cause
Expand Down
Loading

0 comments on commit e4100f9

Please sign in to comment.