Skip to content

Commit

Permalink
redo(ticdc): use uuid in redo meta filename (#8075) (#8130)
Browse files Browse the repository at this point in the history
close #8028, close #8074
  • Loading branch information
ti-chi-bot authored Feb 9, 2023
1 parent b5419b6 commit 1ce0a34
Show file tree
Hide file tree
Showing 19 changed files with 280 additions and 113 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ kafka_consumer:
storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go

cdc_test_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
docker build --platform linux/amd64 -f deployments/ticdc/docker/test.Dockerfile -t cdc:test ./

install:
go install ./...

Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
Storage: "",
},
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func TestRemoveChangefeed(t *testing.T) {
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
10 changes: 10 additions & 0 deletions cdc/redo/common/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ var (
Help: "The latency distributions of flushLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})

// RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker.
RedoWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand All @@ -85,4 +94,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
registry.MustRegister(RedoWorkerBusyRatio)
}
62 changes: 51 additions & 11 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ type ManagerImpl struct {
flushing int64
lastFlushTime time.Time

metricWriteLogDuration prometheus.Observer
metricFlushLogDuration prometheus.Observer
metricWriteLogDuration prometheus.Observer
metricFlushLogDuration prometheus.Observer
metricTotalRowsCount prometheus.Counter
metricRedoWorkerBusyRatio prometheus.Counter
}

// NewManager creates a new Manager
Expand Down Expand Up @@ -174,6 +176,10 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricTotalRowsCount = common.RedoTotalRowsCountGauge.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricRedoWorkerBusyRatio = common.RedoWorkerBusyRatio.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)

// TODO: better to wait background goroutines after the context is canceled.
if m.opts.EnableBgRunner {
Expand All @@ -197,7 +203,7 @@ func NewMockManager(ctx context.Context) (*ManagerImpl, error) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand Down Expand Up @@ -365,6 +371,10 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error {
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoFlushLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoTotalRowsCountGauge.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoWorkerBusyRatio.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
return m.withLock(func(m *ManagerImpl) error { return m.writer.DeleteAllLogs(ctx) })
}

Expand Down Expand Up @@ -428,7 +438,13 @@ func (m *ManagerImpl) postFlushMeta(metaCheckpoint, metaResolved model.Ts) {
m.metaCheckpointTs.setFlushed(metaCheckpoint)
}

func (m *ManagerImpl) flushLog(ctx context.Context, handleErr func(err error)) {
func (m *ManagerImpl) flushLog(
ctx context.Context, handleErr func(err error), workTimeSlice *time.Duration,
) {
start := time.Now()
defer func() {
*workTimeSlice += time.Since(start)
}()
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
log.Debug("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
Expand Down Expand Up @@ -481,7 +497,8 @@ func (m *ManagerImpl) bgUpdateLog(

log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID))
zap.String("changefeed", m.changeFeedID.ID),
zap.Int64("flushIntervalInMs", flushIntervalInMs))

ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
defer func() {
Expand Down Expand Up @@ -511,7 +528,11 @@ func (m *ManagerImpl) bgUpdateLog(
rtsMap := make(map[model.TableID]model.Ts)
releaseMemoryCbs := make([]func(), 0, 1024)

emitBatch := func() {
emitBatch := func(workTimeSlice *time.Duration) {
start := time.Now()
defer func() {
*workTimeSlice += time.Since(start)
}()
if len(logs) > 0 {
start := time.Now()
err = m.writer.WriteLog(ctx, logs)
Expand All @@ -522,6 +543,7 @@ func (m *ManagerImpl) bgUpdateLog(
zap.Int("rows", len(logs)),
zap.Error(err),
zap.Duration("writeLogElapse", writeLogElapse))
m.metricTotalRowsCount.Add(float64(len(logs)))
m.metricWriteLogDuration.Observe(writeLogElapse.Seconds())

for _, releaseMemory := range releaseMemoryCbs {
Expand Down Expand Up @@ -552,18 +574,23 @@ func (m *ManagerImpl) bgUpdateLog(
}
}

overseerTicker := time.NewTicker(time.Second * 5)
defer overseerTicker.Stop()
var workTimeSlice time.Duration
startToWork := time.Now()
for {
if len(logs) > 0 || len(rtsMap) > 0 {
select {
case <-ctx.Done():
return
case <-ticker.C:
emitBatch()
m.flushLog(ctx, handleErr)
emitBatch(&workTimeSlice)
m.flushLog(ctx, handleErr, &workTimeSlice)
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
}
startToHandleEvent := time.Now()
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
Expand All @@ -579,21 +606,28 @@ func (m *ManagerImpl) bgUpdateLog(
default:
log.Panic("redo manager receives unknown event type")
}
workTimeSlice += time.Since(startToHandleEvent)
case now := <-overseerTicker.C:
busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
startToWork = now
workTimeSlice = 0
case err = <-logErrCh:
default:
emitBatch()
emitBatch(&workTimeSlice)
}
} else {
select {
case <-ctx.Done():
return
case <-ticker.C:
emitBatch()
m.flushLog(ctx, handleErr)
emitBatch(&workTimeSlice)
m.flushLog(ctx, handleErr, &workTimeSlice)
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
}
startToHandleEvent := time.Now()
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
Expand All @@ -609,6 +643,12 @@ func (m *ManagerImpl) bgUpdateLog(
default:
log.Panic("redo manager receives unknown event type")
}
workTimeSlice += time.Since(startToHandleEvent)
case now := <-overseerTicker.C:
busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
startToWork = now
workTimeSlice = 0
case err = <-logErrCh:
}
}
Expand Down
9 changes: 5 additions & 4 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestLogManagerInProcessor(t *testing.T) {
defer logMgr.Cleanup(ctx)

checkResolvedTs := func(mgr LogManager, expectedRts uint64) {
time.Sleep(time.Duration(config.MinFlushIntervalInMs+200) * time.Millisecond)
time.Sleep(time.Duration(config.DefaultFlushIntervalInMs+200) * time.Millisecond)
resolvedTs := mgr.GetMinResolvedTs()
require.Equal(t, expectedRts, resolvedTs)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestReuseWritter(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "local://" + dir,
FlushIntervalInMs: config.MinFlushIntervalInMs,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}

errCh := make(chan error, 1)
Expand All @@ -410,7 +410,8 @@ func TestReuseWritter(t *testing.T) {
time.Sleep(time.Duration(100) * time.Millisecond)

// The another redo manager shouldn't be influenced.
mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err })
var workTimeSlice time.Duration
mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }, &workTimeSlice)
select {
case x := <-errCh:
log.Panic("shouldn't get an error", zap.Error(x))
Expand Down
4 changes: 2 additions & 2 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func selectDownLoadFile(
return nil
})
if err != nil {
return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err)
return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

return files, nil
Expand All @@ -153,7 +153,7 @@ func downLoadToLocal(
eg.Go(func() error {
data, err := extStorage.ReadFile(eCtx, f)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

err = os.MkdirAll(dir, redo.DefaultDirMode)
Expand Down
6 changes: 3 additions & 3 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (w *Writer) close() error {
if err != nil {
w.file.Close()
w.file = nil
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
}

Expand Down Expand Up @@ -489,7 +489,7 @@ func (w *Writer) GC(checkPointTs uint64) error {
errs = multierr.Append(errs, err)
}
if errs != nil {
errs = cerror.WrapError(cerror.ErrS3StorageAPI, errs)
errs = cerror.WrapError(cerror.ErrExternalStorageAPI, errs)
log.Warn("delete redo log in s3 fail", zap.Error(errs))
}
}()
Expand Down Expand Up @@ -616,7 +616,7 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error {
// Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name
err = w.storage.WriteFile(ctx, filepath.Base(name), fileData)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}

// in case the page cache piling up triggered the OS memory reclaming which may cause
Expand Down
Loading

0 comments on commit 1ce0a34

Please sign in to comment.