diff --git a/Makefile b/Makefile
index 85aa8b288f3..aa0c7de75ef 100644
--- a/Makefile
+++ b/Makefile
@@ -144,6 +144,10 @@ kafka_consumer:
storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go
+cdc_test_image:
+ @which docker || (echo "docker not found in ${PATH}"; exit 1)
+ docker build --platform linux/amd64 -f deployments/ticdc/docker/test.Dockerfile -t cdc:test ./
+
install:
go install ./...
diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go
index 49e910fc526..9a469f1ee70 100644
--- a/cdc/api/v2/model.go
+++ b/cdc/api/v2/model.go
@@ -341,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
- FlushIntervalInMs: config.MinFlushIntervalInMs,
+ FlushIntervalInMs: config.DefaultFlushIntervalInMs,
Storage: "",
},
}
diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go
index aaeec0f79f6..d71b5f0cc1c 100644
--- a/cdc/owner/changefeed_test.go
+++ b/cdc/owner/changefeed_test.go
@@ -487,7 +487,7 @@ func TestRemoveChangefeed(t *testing.T) {
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
- FlushIntervalInMs: config.MinFlushIntervalInMs,
+ FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go
index edcd62e4cdd..a9dcb19afca 100644
--- a/cdc/processor/sinkmanager/manager.go
+++ b/cdc/processor/sinkmanager/manager.go
@@ -878,10 +878,6 @@ func (m *SinkManager) Close() error {
}
m.sinkMemQuota.Close()
m.redoMemQuota.Close()
- err := m.sinkFactory.Close()
- if err != nil {
- return errors.Trace(err)
- }
m.tableSinks.Range(func(key, value interface{}) bool {
sink := value.(*tableSinkWrapper)
sink.close(m.ctx)
@@ -890,11 +886,18 @@ func (m *SinkManager) Close() error {
}
return true
})
+ m.wg.Wait()
log.Info("All table sinks closed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
- m.wg.Wait()
+ // todo: Add a unit test to cover this,
+ // Make sure all sink workers exited before closing the sink factory.
+ // Otherwise, it would panic in the sink when you try to write some data to a closed sink.
+ err := m.sinkFactory.Close()
+ if err != nil {
+ return errors.Trace(err)
+ }
log.Info("Closed sink manager",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
diff --git a/cdc/redo/common/metric.go b/cdc/redo/common/metric.go
index b2e4cd8e61c..74011bab63b 100644
--- a/cdc/redo/common/metric.go
+++ b/cdc/redo/common/metric.go
@@ -75,6 +75,15 @@ var (
Help: "The latency distributions of flushLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
+
+ // RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker.
+ RedoWorkerBusyRatio = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "worker_busy_ratio",
+ Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.",
+ }, []string{"namespace", "changefeed"})
)
// InitMetrics registers all metrics in this file
@@ -85,4 +94,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
+ registry.MustRegister(RedoWorkerBusyRatio)
}
diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go
index bf39b6bf800..98d7aeecffe 100644
--- a/cdc/redo/manager.go
+++ b/cdc/redo/manager.go
@@ -137,8 +137,10 @@ type ManagerImpl struct {
flushing int64
lastFlushTime time.Time
- metricWriteLogDuration prometheus.Observer
- metricFlushLogDuration prometheus.Observer
+ metricWriteLogDuration prometheus.Observer
+ metricFlushLogDuration prometheus.Observer
+ metricTotalRowsCount prometheus.Counter
+ metricRedoWorkerBusyRatio prometheus.Counter
}
// NewManager creates a new Manager
@@ -174,6 +176,10 @@ func NewManager(ctx context.Context, cfg *config.ConsistentConfig, opts *Manager
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
+ m.metricTotalRowsCount = common.RedoTotalRowsCountGauge.
+ WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
+ m.metricRedoWorkerBusyRatio = common.RedoWorkerBusyRatio.
+ WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
// TODO: better to wait background goroutines after the context is canceled.
if m.opts.EnableBgRunner {
@@ -197,7 +203,7 @@ func NewMockManager(ctx context.Context) (*ManagerImpl, error) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
- FlushIntervalInMs: config.MinFlushIntervalInMs,
+ FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
errCh := make(chan error, 1)
@@ -365,6 +371,10 @@ func (m *ManagerImpl) Cleanup(ctx context.Context) error {
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
common.RedoFlushLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
+ common.RedoTotalRowsCountGauge.
+ DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
+ common.RedoWorkerBusyRatio.
+ DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
return m.withLock(func(m *ManagerImpl) error { return m.writer.DeleteAllLogs(ctx) })
}
@@ -428,7 +438,13 @@ func (m *ManagerImpl) postFlushMeta(metaCheckpoint, metaResolved model.Ts) {
m.metaCheckpointTs.setFlushed(metaCheckpoint)
}
-func (m *ManagerImpl) flushLog(ctx context.Context, handleErr func(err error)) {
+func (m *ManagerImpl) flushLog(
+ ctx context.Context, handleErr func(err error), workTimeSlice *time.Duration,
+) {
+ start := time.Now()
+ defer func() {
+ *workTimeSlice += time.Since(start)
+ }()
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
log.Debug("Fail to update flush flag, " +
"the previous flush operation hasn't finished yet")
@@ -481,7 +497,8 @@ func (m *ManagerImpl) bgUpdateLog(
log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.changeFeedID.Namespace),
- zap.String("changefeed", m.changeFeedID.ID))
+ zap.String("changefeed", m.changeFeedID.ID),
+ zap.Int64("flushIntervalInMs", flushIntervalInMs))
ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
defer func() {
@@ -511,7 +528,11 @@ func (m *ManagerImpl) bgUpdateLog(
rtsMap := make(map[model.TableID]model.Ts)
releaseMemoryCbs := make([]func(), 0, 1024)
- emitBatch := func() {
+ emitBatch := func(workTimeSlice *time.Duration) {
+ start := time.Now()
+ defer func() {
+ *workTimeSlice += time.Since(start)
+ }()
if len(logs) > 0 {
start := time.Now()
err = m.writer.WriteLog(ctx, logs)
@@ -522,6 +543,7 @@ func (m *ManagerImpl) bgUpdateLog(
zap.Int("rows", len(logs)),
zap.Error(err),
zap.Duration("writeLogElapse", writeLogElapse))
+ m.metricTotalRowsCount.Add(float64(len(logs)))
m.metricWriteLogDuration.Observe(writeLogElapse.Seconds())
for _, releaseMemory := range releaseMemoryCbs {
@@ -552,18 +574,23 @@ func (m *ManagerImpl) bgUpdateLog(
}
}
+ overseerTicker := time.NewTicker(time.Second * 5)
+ defer overseerTicker.Stop()
+ var workTimeSlice time.Duration
+ startToWork := time.Now()
for {
if len(logs) > 0 || len(rtsMap) > 0 {
select {
case <-ctx.Done():
return
case <-ticker.C:
- emitBatch()
- m.flushLog(ctx, handleErr)
+ emitBatch(&workTimeSlice)
+ m.flushLog(ctx, handleErr, &workTimeSlice)
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
}
+ startToHandleEvent := time.Now()
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
@@ -579,21 +606,28 @@ func (m *ManagerImpl) bgUpdateLog(
default:
log.Panic("redo manager receives unknown event type")
}
+ workTimeSlice += time.Since(startToHandleEvent)
+ case now := <-overseerTicker.C:
+ busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
+ m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
+ startToWork = now
+ workTimeSlice = 0
case err = <-logErrCh:
default:
- emitBatch()
+ emitBatch(&workTimeSlice)
}
} else {
select {
case <-ctx.Done():
return
case <-ticker.C:
- emitBatch()
- m.flushLog(ctx, handleErr)
+ emitBatch(&workTimeSlice)
+ m.flushLog(ctx, handleErr, &workTimeSlice)
case cache, ok := <-m.logBuffer.Out():
if !ok {
return // channel closed
}
+ startToHandleEvent := time.Now()
switch cache.eventType {
case model.MessageTypeRow:
for _, row := range cache.rows {
@@ -609,6 +643,12 @@ func (m *ManagerImpl) bgUpdateLog(
default:
log.Panic("redo manager receives unknown event type")
}
+ workTimeSlice += time.Since(startToHandleEvent)
+ case now := <-overseerTicker.C:
+ busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
+ m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
+ startToWork = now
+ workTimeSlice = 0
case err = <-logErrCh:
}
}
diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go
index ad3d19deec6..0c0c218c59c 100644
--- a/cdc/redo/manager_test.go
+++ b/cdc/redo/manager_test.go
@@ -100,7 +100,7 @@ func TestLogManagerInProcessor(t *testing.T) {
defer logMgr.Cleanup(ctx)
checkResolvedTs := func(mgr LogManager, expectedRts uint64) {
- time.Sleep(time.Duration(config.MinFlushIntervalInMs+200) * time.Millisecond)
+ time.Sleep(time.Duration(config.DefaultFlushIntervalInMs+200) * time.Millisecond)
resolvedTs := mgr.GetMinResolvedTs()
require.Equal(t, expectedRts, resolvedTs)
}
@@ -322,7 +322,7 @@ func TestManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "blackhole://",
- FlushIntervalInMs: config.MinFlushIntervalInMs,
+ FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
errCh := make(chan error, 1)
@@ -387,7 +387,7 @@ func TestReuseWritter(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
Storage: "local://" + dir,
- FlushIntervalInMs: config.MinFlushIntervalInMs,
+ FlushIntervalInMs: config.DefaultFlushIntervalInMs,
}
errCh := make(chan error, 1)
@@ -410,7 +410,8 @@ func TestReuseWritter(t *testing.T) {
time.Sleep(time.Duration(100) * time.Millisecond)
// The another redo manager shouldn't be influenced.
- mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err })
+ var workTimeSlice time.Duration
+ mgrs[1].flushLog(ctxs[1], func(err error) { opts.ErrCh <- err }, &workTimeSlice)
select {
case x := <-errCh:
log.Panic("shouldn't get an error", zap.Error(x))
diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go
index fa184129bd4..491130cd00e 100644
--- a/cdc/redo/reader/file.go
+++ b/cdc/redo/reader/file.go
@@ -133,7 +133,7 @@ func selectDownLoadFile(
return nil
})
if err != nil {
- return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
return files, nil
@@ -153,7 +153,7 @@ func downLoadToLocal(
eg.Go(func() error {
data, err := extStorage.ReadFile(eCtx, f)
if err != nil {
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
err = os.MkdirAll(dir, redo.DefaultDirMode)
diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go
index c2b74318d48..a486127085a 100644
--- a/cdc/redo/writer/file.go
+++ b/cdc/redo/writer/file.go
@@ -45,8 +45,7 @@ const (
// pageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that log can safely
// distinguish between torn writes and ordinary data corruption.
- pageBytes = 8 * redo.MinSectorSize
- defaultS3Timeout = 15 * time.Second
+ pageBytes = 8 * redo.MinSectorSize
)
var (
@@ -298,7 +297,9 @@ func (w *Writer) Close() error {
common.RedoWriteBytesGauge.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)
- return w.close()
+ ctx, cancel := context.WithTimeout(context.Background(), redo.CloseTimeout)
+ defer cancel()
+ return w.close(ctx)
}
// IsRunning implement IsRunning interface
@@ -310,7 +311,7 @@ func (w *Writer) isGCRunning() bool {
return w.gcRunning.Load()
}
-func (w *Writer) close() error {
+func (w *Writer) close(ctx context.Context) error {
if w.file == nil {
return nil
}
@@ -358,14 +359,11 @@ func (w *Writer) close() error {
// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.UseExternalStorage {
- ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
- defer cancel()
-
err = w.writeToS3(ctx, w.ongoingFilePath)
if err != nil {
w.file.Close()
w.file = nil
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
}
@@ -448,7 +446,9 @@ func (w *Writer) newPageWriter() error {
}
func (w *Writer) rotate() error {
- if err := w.close(); err != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), redo.DefaultTimeout)
+ defer cancel()
+ if err := w.close(ctx); err != nil {
return err
}
return w.openNew()
@@ -489,7 +489,7 @@ func (w *Writer) GC(checkPointTs uint64) error {
errs = multierr.Append(errs, err)
}
if errs != nil {
- errs = cerror.WrapError(cerror.ErrS3StorageAPI, errs)
+ errs = cerror.WrapError(cerror.ErrExternalStorageAPI, errs)
log.Warn("delete redo log in s3 fail", zap.Error(errs))
}
}()
@@ -616,7 +616,7 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error {
// Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name
err = w.storage.WriteFile(ctx, filepath.Base(name), fileData)
if err != nil {
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
}
// in case the page cache piling up triggered the OS memory reclaming which may cause
diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go
index 28fa5fc29f1..73afac63e28 100644
--- a/cdc/redo/writer/writer.go
+++ b/cdc/redo/writer/writer.go
@@ -19,19 +19,18 @@ import (
"net/url"
"os"
"path/filepath"
+ "time"
- "github.com/aws/aws-sdk-go/aws/awserr"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/config"
- cerror "github.com/pingcap/tiflow/pkg/errors"
+ "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
- "github.com/prometheus/client_golang/prometheus"
+ "github.com/pingcap/tiflow/pkg/util"
+ "github.com/pingcap/tiflow/pkg/uuid"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@@ -76,7 +75,7 @@ func NewRedoLogWriter(
scheme := uri.Scheme
if !redo.IsValidConsistentStorage(scheme) {
- return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(scheme)
+ return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if redo.IsBlackholeStorage(scheme) {
return NewBlackHoleWriter(), nil
@@ -132,20 +131,31 @@ type logWriter struct {
// the redo log files when changefeed is created or deleted.
extStorage storage.ExternalStorage
- meta *common.LogMeta
-
- metricTotalRowsCount prometheus.Gauge
+ meta *common.LogMeta
+ preMetaFile string
+ uuidGenerator uuid.Generator
}
func newLogWriter(
ctx context.Context, cfg *logWriterConfig, opts ...Option,
) (lw *logWriter, err error) {
if cfg == nil {
- return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogWriterConfig can not be nil"))
+ err := errors.New("LogWriterConfig can not be nil")
+ return nil, errors.WrapError(errors.ErrRedoConfigInvalid, err)
}
lw = &logWriter{cfg: cfg}
+ writerOp := &writerOptions{}
+ for _, opt := range opts {
+ opt(writerOp)
+ }
+ if writerOp.getUUIDGenerator != nil {
+ lw.uuidGenerator = writerOp.getUUIDGenerator()
+ } else {
+ lw.uuidGenerator = uuid.NewGenerator()
+ }
+
if lw.cfg.EmitRowEvents {
writerCfg := &FileWriterConfig{
FileType: redo.RedoRowLogFileType,
@@ -198,15 +208,13 @@ func newLogWriter(
}
}
- lw.metricTotalRowsCount = common.RedoTotalRowsCountGauge.
- WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID)
return
}
func (l *logWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.extStorage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
if !ret {
return nil
@@ -228,8 +236,8 @@ func (l *logWriter) preCleanUpS3(ctx context.Context) error {
return err
}
err = l.extStorage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
- if !isNotExistInS3(err) {
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ if !util.IsNotExistInExtStorage(err) {
+ return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
return nil
@@ -249,12 +257,13 @@ func (l *logWriter) initMeta(ctx context.Context) error {
if os.IsNotExist(err) {
return nil
}
- return cerror.WrapError(cerror.ErrRedoMetaInitialize, errors.Annotate(err, "read meta file fail"))
+ err = errors.Annotate(err, "read meta file fail")
+ return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}
_, err = l.meta.UnmarshalMsg(data)
if err != nil {
- return cerror.WrapError(cerror.ErrRedoMetaInitialize, err)
+ return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}
return nil
@@ -281,13 +290,13 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv
}
if l.isStopped() {
- return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
+ return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if len(rows) == 0 {
return nil
}
- for i, r := range rows {
+ for _, r := range rows {
if r == nil || r.Row == nil {
continue
}
@@ -295,17 +304,15 @@ func (l *logWriter) WriteLog(ctx context.Context, rows []*model.RedoRowChangedEv
rl := &model.RedoLog{RedoRow: r, Type: model.RedoLogTypeRow}
data, err := rl.MarshalMsg(nil)
if err != nil {
- return cerror.WrapError(cerror.ErrMarshalFailed, err)
+ return errors.WrapError(errors.ErrMarshalFailed, err)
}
l.rowWriter.AdvanceTs(r.Row.CommitTs)
_, err = l.rowWriter.Write(data)
if err != nil {
- l.metricTotalRowsCount.Add(float64(i))
return err
}
}
- l.metricTotalRowsCount.Add(float64(len(rows)))
return nil
}
@@ -318,7 +325,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
}
if l.isStopped() {
- return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
+ return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if ddl == nil || ddl.DDL == nil {
return nil
@@ -327,7 +334,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
rl := &model.RedoLog{RedoDDL: ddl, Type: model.RedoLogTypeDDL}
data, err := rl.MarshalMsg(nil)
if err != nil {
- return cerror.WrapError(cerror.ErrMarshalFailed, err)
+ return errors.WrapError(errors.ErrMarshalFailed, err)
}
l.ddlWriter.AdvanceTs(ddl.DDL.CommitTs)
@@ -344,10 +351,10 @@ func (l *logWriter) FlushLog(ctx context.Context, checkpointTs, resolvedTs model
}
if l.isStopped() {
- return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
+ return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
- return l.flush(checkpointTs, resolvedTs)
+ return l.flush(ctx, checkpointTs, resolvedTs)
}
// GetMeta implement GetMeta api
@@ -370,7 +377,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("read removed log dir fail", zap.Error(err))
return nil
}
- return cerror.WrapError(cerror.ErrRedoFileOp,
+ return errors.WrapError(errors.ErrRedoFileOp,
errors.Annotatef(err, "can't read log file directory: %s", l.cfg.Dir))
}
@@ -386,7 +393,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ return errors.WrapError(errors.ErrRedoFileOp, err)
}
} else {
for _, file := range filteredFiles {
@@ -395,7 +402,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ return errors.WrapError(errors.ErrRedoFileOp, err)
}
}
}
@@ -432,7 +439,7 @@ func (l *logWriter) getDeletedChangefeedMarker() string {
}
func (l *logWriter) writeDeletedMarkerToS3(ctx context.Context) error {
- return cerror.WrapError(cerror.ErrS3StorageAPI,
+ return errors.WrapError(errors.ErrExternalStorageAPI,
l.extStorage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}
@@ -444,8 +451,8 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
err := l.extStorage.DeleteFile(eCtx, name)
if err != nil {
// if fail then retry, may end up with notExit err, ignore the error
- if !isNotExistInS3(err) {
- return cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ if !util.IsNotExistInExtStorage(err) {
+ return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
}
return nil
@@ -454,18 +461,6 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
return eg.Wait()
}
-func isNotExistInS3(err error) bool {
- if err != nil {
- if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint
- switch aerr.Code() {
- case s3.ErrCodeNoSuchKey:
- return true
- }
- }
- }
- return false
-}
-
var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error) {
files := []string{}
err := l.extStorage.WalkDir(ctx, &storage.WalkOption{}, func(path string, _ int64) error {
@@ -473,7 +468,7 @@ var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error)
return nil
})
if err != nil {
- return nil, cerror.WrapError(cerror.ErrS3StorageAPI, err)
+ return nil, errors.WrapError(errors.ErrExternalStorageAPI, err)
}
return files, nil
@@ -494,7 +489,7 @@ func (l *logWriter) Close() (err error) {
}
// flush flushes all the buffered data to the disk.
-func (l *logWriter) flush(checkpointTs, resolvedTs model.Ts) (err error) {
+func (l *logWriter) flush(ctx context.Context, checkpointTs, resolvedTs model.Ts) (err error) {
if l.cfg.EmitDDLEvents {
err = multierr.Append(err, l.ddlWriter.Flush())
}
@@ -502,7 +497,7 @@ func (l *logWriter) flush(checkpointTs, resolvedTs model.Ts) (err error) {
err = multierr.Append(err, l.rowWriter.Flush())
}
if l.cfg.EmitMeta {
- err = multierr.Append(err, l.flushLogMeta(checkpointTs, resolvedTs))
+ err = multierr.Append(err, l.flushLogMeta(ctx, checkpointTs, resolvedTs))
}
return
}
@@ -518,16 +513,6 @@ func (l *logWriter) isStopped() bool {
return rowStopped || ddlStopped
}
-func (l *logWriter) getMetafileName() string {
- if model.DefaultNamespace == l.cfg.ChangeFeedID.Namespace {
- return fmt.Sprintf("%s_%s_%s%s", l.cfg.CaptureID, l.cfg.ChangeFeedID.ID,
- redo.RedoMetaFileType, redo.MetaEXT)
- }
- return fmt.Sprintf("%s_%s_%s_%s%s", l.cfg.CaptureID,
- l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID,
- redo.RedoMetaFileType, redo.MetaEXT)
-}
-
func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, error) {
// NOTE: both checkpoint and resolved can regress if a cdc instance restarts.
hasChange := false
@@ -558,12 +543,12 @@ func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er
data, err := l.meta.MarshalMsg(nil)
if err != nil {
- err = cerror.WrapError(cerror.ErrMarshalFailed, err)
+ err = errors.WrapError(errors.ErrMarshalFailed, err)
}
return data, err
}
-func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
+func (l *logWriter) flushLogMeta(ctx context.Context, checkpointTs, resolvedTs uint64) error {
data, err := l.maybeUpdateMeta(checkpointTs, resolvedTs)
if err != nil {
return err
@@ -572,62 +557,69 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
return nil
}
- err = os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode)
- if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile"))
- }
-
- // we will create a temp metadata file and then atomically rename it.
- tmpFileName := l.filePath() + redo.MetaTmpEXT
- tmpFile, err := openTruncFile(tmpFileName)
- if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ if !l.cfg.UseExternalStorage {
+ return l.flushMetaToLocal(data)
}
- defer tmpFile.Close()
+ return l.flushMetaToS3(ctx, data)
+}
- _, err = tmpFile.Write(data)
- if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
- }
- err = tmpFile.Sync()
- if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+func (l *logWriter) flushMetaToLocal(data []byte) error {
+ if err := os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode); err != nil {
+ e := errors.Annotate(err, "can't make dir for new redo logfile")
+ return errors.WrapError(errors.ErrRedoFileOp, e)
}
- err = os.Rename(tmpFileName, l.filePath())
+ metaFile, err := openTruncFile(l.filePath())
if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ return errors.WrapError(errors.ErrRedoFileOp, err)
}
-
- dirFile, err := os.Open(l.cfg.Dir)
+ _, err = metaFile.Write(data)
if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ return errors.WrapError(errors.ErrRedoFileOp, err)
}
- defer dirFile.Close()
- // sync the dir to guarantee the renamed file is persisted to disk.
- err = dirFile.Sync()
+ err = metaFile.Sync()
if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+ return errors.WrapError(errors.ErrRedoFileOp, err)
}
- if !l.cfg.UseExternalStorage {
- return nil
+ if l.preMetaFile != "" {
+ if err := os.Remove(l.preMetaFile); err != nil && !os.IsNotExist(err) {
+ return errors.WrapError(errors.ErrRedoFileOp, err)
+ }
}
+ l.preMetaFile = metaFile.Name()
- ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
- defer cancel()
- return l.writeMetaToS3(ctx)
+ return metaFile.Close()
}
-func (l *logWriter) writeMetaToS3(ctx context.Context) error {
- name := l.filePath()
- fileData, err := os.ReadFile(name)
- if err != nil {
- return cerror.WrapError(cerror.ErrRedoFileOp, err)
+func (l *logWriter) flushMetaToS3(ctx context.Context, data []byte) error {
+ start := time.Now()
+ metaFile := l.getMetafileName()
+ if err := l.extStorage.WriteFile(ctx, metaFile, data); err != nil {
+ return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
- return cerror.WrapError(cerror.ErrS3StorageAPI,
- l.extStorage.WriteFile(ctx, l.getMetafileName(), fileData))
+ if l.preMetaFile != "" {
+ if l.preMetaFile == metaFile {
+ // This should only happen when use a constant uuid generator in test.
+ return nil
+ }
+ err := l.extStorage.DeleteFile(ctx, l.preMetaFile)
+ if err != nil && !util.IsNotExistInExtStorage(err) {
+ return errors.WrapError(errors.ErrExternalStorageAPI, err)
+ }
+ }
+ l.preMetaFile = metaFile
+ log.Debug("flush meta to s3",
+ zap.String("metaFile", metaFile),
+ zap.Any("cost", time.Since(start).Milliseconds()))
+ return nil
+}
+
+func (l *logWriter) getMetafileName() string {
+ return fmt.Sprintf(redo.RedoMetaFileFormat, l.cfg.CaptureID,
+ l.cfg.ChangeFeedID.Namespace, l.cfg.ChangeFeedID.ID,
+ redo.RedoMetaFileType, l.uuidGenerator.NewString(), redo.MetaEXT)
}
func (l *logWriter) filePath() string {
diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go
index 329a9c39f8d..b6a6684ddb1 100644
--- a/cdc/redo/writer/writer_test.go
+++ b/cdc/redo/writer/writer_test.go
@@ -129,8 +129,6 @@ func TestLogWriterWriteLog(t *testing.T) {
rowWriter: mockWriter,
ddlWriter: mockWriter,
meta: &common.LogMeta{},
- metricTotalRowsCount: common.RedoTotalRowsCountGauge.
- WithLabelValues("default", ""),
}
if tt.name == "context cancel" {
ctx, cancel := context.WithCancel(context.Background())
@@ -314,7 +312,7 @@ func TestLogWriterFlushLog(t *testing.T) {
mockStorage := mockstorage.NewMockExternalStorage(controller)
if tt.isRunning && tt.name != "context cancel" {
mockStorage.EXPECT().WriteFile(gomock.Any(),
- "cp_test-cf_meta.meta",
+ "cp_default_test-cf_meta_uid.meta",
gomock.Any()).Return(nil).Times(1)
}
mockWriter := &mockFileWriter{}
@@ -333,11 +331,12 @@ func TestLogWriterFlushLog(t *testing.T) {
UseExternalStorage: true,
}
writer := logWriter{
- cfg: cfg,
- rowWriter: mockWriter,
- ddlWriter: mockWriter,
- meta: &common.LogMeta{},
- extStorage: mockStorage,
+ cfg: cfg,
+ uuidGenerator: uuid.NewConstGenerator("uid"),
+ rowWriter: mockWriter,
+ ddlWriter: mockWriter,
+ meta: &common.LogMeta{},
+ extStorage: mockStorage,
}
if tt.name == "context cancel" {
@@ -421,7 +420,7 @@ func TestNewLogWriter(t *testing.T) {
CaptureID: "cp",
MaxLogSize: 10,
}
- l, err := newLogWriter(ctx, cfg)
+ l, err := newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen }))
require.Nil(t, err)
err = l.Close()
require.Nil(t, err)
@@ -435,7 +434,7 @@ func TestNewLogWriter(t *testing.T) {
_, err = f.Write(data)
require.Nil(t, err)
- l, err = newLogWriter(ctx, cfg)
+ l, err = newLogWriter(ctx, cfg, WithUUIDGenerator(func() uuid.Generator { return uuidGen }))
require.Nil(t, err)
err = l.Close()
require.Nil(t, err)
diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
index 086b9be9b60..9c31634d29a 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
@@ -16,6 +16,7 @@ import (
"context"
"math"
"net/url"
+ "sync"
"sync/atomic"
"github.com/pingcap/errors"
@@ -32,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
putil "github.com/pingcap/tiflow/pkg/util"
+ "golang.org/x/sync/errgroup"
)
const (
@@ -66,6 +68,7 @@ type eventFragment struct {
// dmlSink is the cloud storage sink.
// It will send the events to cloud storage systems.
type dmlSink struct {
+ changefeedID model.ChangeFeedID
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
// encodingWorkers defines a group of workers for encoding events.
@@ -78,6 +81,7 @@ type dmlSink struct {
statistics *metrics.Statistics
// last sequence number
lastSeqNum uint64
+ wg sync.WaitGroup
}
// NewCloudStorageSink creates a cloud storage sink.
@@ -119,24 +123,49 @@ func NewCloudStorageSink(ctx context.Context,
return nil, cerror.WrapError(cerror.ErrCloudStorageInvalidConfig, err)
}
- changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
+ s.changefeedID = contextutil.ChangefeedIDFromCtx(ctx)
s.msgCh = make(chan eventFragment, defaultChannelSize)
s.defragmenter = newDefragmenter(ctx)
orderedCh := s.defragmenter.orderedOut()
s.statistics = metrics.NewStatistics(ctx, sink.TxnSink)
- s.writer = newDMLWriter(ctx, changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh)
-
+ s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh)
+ s.encodingWorkers = make([]*encodingWorker, 0, defaultEncodingConcurrency)
// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
- w := newEncodingWorker(i+1, changefeedID, encoder, s.msgCh, s.defragmenter, errCh)
- w.run(ctx)
+ w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter)
s.encodingWorkers = append(s.encodingWorkers, w)
}
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+ if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
+ errCh <- err
+ }
+ }()
+
return s, nil
}
+func (s *dmlSink) run(ctx context.Context) error {
+ eg, ctx := errgroup.WithContext(ctx)
+ // run dml writer
+ eg.Go(func() error {
+ return s.writer.run(ctx)
+ })
+
+ // run the encoding workers.
+ for i := 0; i < defaultEncodingConcurrency; i++ {
+ worker := s.encodingWorkers[i]
+ eg.Go(func() error {
+ return worker.run(ctx)
+ })
+ }
+
+ return eg.Wait()
+}
+
// WriteEvents write events to cloud storage sink.
func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.SingleTableTxn]) error {
var tbl versionedTable
@@ -175,5 +204,6 @@ func (s *dmlSink) Close() error {
if s.statistics != nil {
s.statistics.Close()
}
+ s.wg.Wait()
return nil
}
diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
index 80928c1206a..1802fefeab6 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
@@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"
+ "github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
@@ -33,6 +34,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
)
// dmlWorker denotes a worker responsible for writing messages to cloud storage.
@@ -50,9 +52,7 @@ type dmlWorker struct {
fileIndex map[versionedTable]*indexWithDate
// fileSize maintains a mapping of
.
fileSize map[versionedTable]uint64
- wg sync.WaitGroup
isClosed uint64
- errCh chan<- error
extension string
statistics *metrics.Statistics
clock clock.Clock
@@ -94,7 +94,6 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
statistics *metrics.Statistics,
- errCh chan<- error,
) *dmlWorker {
d := &dmlWorker{
id: id,
@@ -106,7 +105,6 @@ func newDMLWorker(
fileIndex: make(map[versionedTable]*indexWithDate),
fileSize: make(map[versionedTable]uint64),
extension: extension,
- errCh: errCh,
statistics: statistics,
clock: clock.New(),
bufferPool: sync.Pool{
@@ -122,65 +120,80 @@ func newDMLWorker(
}
// run creates a set of background goroutines.
-func (d *dmlWorker) run(ctx context.Context, ch *chann.Chann[eventFragment]) {
- d.backgroundFlushMsgs(ctx)
- d.backgroundDispatchTasks(ctx, ch)
+func (d *dmlWorker) run(ctx context.Context, ch *chann.Chann[eventFragment]) error {
+ log.Debug("dml worker started", zap.Int("workerID", d.id),
+ zap.String("namespace", d.changeFeedID.Namespace),
+ zap.String("changefeed", d.changeFeedID.ID))
+
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.Go(func() error {
+ return d.flushMessages(ctx)
+ })
+
+ eg.Go(func() error {
+ return d.dispatchFlushTasks(ctx, ch)
+ })
+
+ return eg.Wait()
}
-// backgroundFlushMsgs flush messages from active tables to cloud storage.
+// flushMessages flush messages from active tables to cloud storage.
// active means that a table has events since last flushing.
-func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
-
- for {
- select {
- case <-ctx.Done():
- return
- case task := <-d.flushNotifyCh:
- if atomic.LoadUint64(&d.isClosed) == 1 {
- return
+func (d *dmlWorker) flushMessages(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return errors.Trace(ctx.Err())
+ case task := <-d.flushNotifyCh:
+ if atomic.LoadUint64(&d.isClosed) == 1 {
+ return nil
+ }
+ for _, tbl := range task.targetTables {
+ table := versionedTable{
+ TableName: tbl.tableName,
+ version: tbl.tableInfo.Version,
+ }
+ d.tableEvents.mu.Lock()
+ events := make([]eventFragment, len(d.tableEvents.fragments[table]))
+ copy(events, d.tableEvents.fragments[table])
+ d.tableEvents.fragments[table] = nil
+ d.tableEvents.mu.Unlock()
+ if len(events) == 0 {
+ continue
}
- for _, tbl := range task.targetTables {
- table := versionedTable{
- TableName: tbl.tableName,
- version: tbl.tableInfo.Version,
- }
- d.tableEvents.mu.Lock()
- events := make([]eventFragment, len(d.tableEvents.fragments[table]))
- copy(events, d.tableEvents.fragments[table])
- d.tableEvents.fragments[table] = nil
- d.tableEvents.mu.Unlock()
- if len(events) == 0 {
- continue
- }
-
- // generate scheme.json file before generating the first data file if necessary
- err := d.writeSchemaFile(ctx, table, tbl.tableInfo)
- if err != nil {
- d.errCh <- err
- return
- }
- path := d.generateDataFilePath(table)
- err = d.writeDataFile(ctx, path, events)
- if err != nil {
- d.errCh <- err
- return
- }
+ // generate scheme.json file before generating the first data file if necessary
+ err := d.writeSchemaFile(ctx, table, tbl.tableInfo)
+ if err != nil {
+ log.Error("failed to write schema file to external storage",
+ zap.Int("workerID", d.id),
+ zap.String("namespace", d.changeFeedID.Namespace),
+ zap.String("changefeed", d.changeFeedID.ID),
+ zap.Error(err))
+ return errors.Trace(err)
+ }
- log.Debug("write file to storage success", zap.Int("workerID", d.id),
+ path := d.generateDataFilePath(table)
+ err = d.writeDataFile(ctx, path, events)
+ if err != nil {
+ log.Error("failed to write data file to external storage",
+ zap.Int("workerID", d.id),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID),
- zap.String("schema", table.Schema),
- zap.String("table", table.Table),
- zap.String("path", path),
- )
+ zap.Error(err))
+ return errors.Trace(err)
}
+
+ log.Debug("write file to storage success", zap.Int("workerID", d.id),
+ zap.String("namespace", d.changeFeedID.Namespace),
+ zap.String("changefeed", d.changeFeedID.ID),
+ zap.String("schema", table.Schema),
+ zap.String("table", table.Table),
+ zap.String("path", path),
+ )
}
}
- }()
+ }
}
// In order to avoid spending so much time lookuping directory and getting last write point
@@ -257,97 +270,90 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eve
return nil
}
-// backgroundDispatchTasks dispatches flush tasks in two conditions:
+// dispatchFlushTasks dispatches flush tasks in two conditions:
// 1. the flush interval exceeds the upper limit.
// 2. the file size exceeds the upper limit.
-func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann[eventFragment]) {
+func (d *dmlWorker) dispatchFlushTasks(ctx context.Context, ch *chann.Chann[eventFragment]) error {
tableSet := make(map[wrappedTable]struct{})
ticker := time.NewTicker(d.config.FlushInterval)
- d.wg.Add(1)
- go func() {
- log.Debug("dml worker started", zap.Int("workerID", d.id),
- zap.String("namespace", d.changeFeedID.Namespace),
- zap.String("changefeed", d.changeFeedID.ID))
- defer d.wg.Done()
- for {
+ for {
+ select {
+ case <-ctx.Done():
+ return errors.Trace(ctx.Err())
+ case <-ticker.C:
+ if atomic.LoadUint64(&d.isClosed) == 1 {
+ return nil
+ }
+ var readyTables []wrappedTable
+ for tbl := range tableSet {
+ readyTables = append(readyTables, tbl)
+ }
+ if len(readyTables) == 0 {
+ continue
+ }
+ task := flushTask{
+ targetTables: readyTables,
+ }
select {
case <-ctx.Done():
- return
- case <-ticker.C:
- if atomic.LoadUint64(&d.isClosed) == 1 {
- return
- }
- var readyTables []wrappedTable
- for tbl := range tableSet {
- readyTables = append(readyTables, tbl)
+ return errors.Trace(ctx.Err())
+ case d.flushNotifyCh <- task:
+ log.Debug("flush task is emitted successfully when flush interval exceeds",
+ zap.Any("tables", task.targetTables))
+ for elem := range tableSet {
+ // we should get TableName using elem.tableName instead of
+ // elem.tableInfo.TableName because the former one contains
+ // the physical table id (useful for partition table)
+ // recorded in mounter while the later one does not.
+ // TODO: handle TableID of model.TableInfo.TableName properly.
+ tbl := versionedTable{
+ TableName: elem.tableName,
+ version: elem.tableInfo.Version,
+ }
+ d.fileSize[tbl] = 0
}
- if len(readyTables) == 0 {
- continue
+ tableSet = make(map[wrappedTable]struct{})
+ default:
+ }
+ case frag, ok := <-ch.Out():
+ if !ok || atomic.LoadUint64(&d.isClosed) == 1 {
+ return nil
+ }
+ table := frag.versionedTable
+ d.tableEvents.mu.Lock()
+ d.tableEvents.fragments[table] = append(d.tableEvents.fragments[table], frag)
+ d.tableEvents.mu.Unlock()
+
+ key := wrappedTable{
+ tableName: frag.TableName,
+ tableInfo: frag.event.Event.TableInfo,
+ }
+
+ tableSet[key] = struct{}{}
+ for _, msg := range frag.encodedMsgs {
+ if msg.Value != nil {
+ d.fileSize[table] += uint64(len(msg.Value))
}
+ }
+ // if the file size exceeds the upper limit, emit the flush task containing the table
+ // as soon as possible.
+ if d.fileSize[table] > uint64(d.config.FileSize) {
task := flushTask{
- targetTables: readyTables,
+ targetTables: []wrappedTable{key},
}
select {
case <-ctx.Done():
- return
+ return errors.Trace(ctx.Err())
case d.flushNotifyCh <- task:
- log.Debug("flush task is emitted successfully when flush interval exceeds",
- zap.Any("tables", task.targetTables))
- for elem := range tableSet {
- // we should get TableName using elem.tableName instead of
- // elem.tableInfo.TableName because the former one contains
- // the physical table id (useful for partition table)
- // recorded in mounter while the later one does not.
- // TODO: handle TableID of model.TableInfo.TableName properly.
- tbl := versionedTable{
- TableName: elem.tableName,
- version: elem.tableInfo.Version,
- }
- d.fileSize[tbl] = 0
- }
- tableSet = make(map[wrappedTable]struct{})
+ log.Debug("flush task is emitted successfully when file size exceeds",
+ zap.Any("tables", table))
+ d.fileSize[table] = 0
default:
}
- case frag, ok := <-ch.Out():
- if !ok || atomic.LoadUint64(&d.isClosed) == 1 {
- return
- }
- table := frag.versionedTable
- d.tableEvents.mu.Lock()
- d.tableEvents.fragments[table] = append(d.tableEvents.fragments[table], frag)
- d.tableEvents.mu.Unlock()
-
- key := wrappedTable{
- tableName: frag.TableName,
- tableInfo: frag.event.Event.TableInfo,
- }
-
- tableSet[key] = struct{}{}
- for _, msg := range frag.encodedMsgs {
- if msg.Value != nil {
- d.fileSize[table] += uint64(len(msg.Value))
- }
- }
- // if the file size exceeds the upper limit, emit the flush task containing the table
- // as soon as possible.
- if d.fileSize[table] > uint64(d.config.FileSize) {
- task := flushTask{
- targetTables: []wrappedTable{key},
- }
- select {
- case <-ctx.Done():
- return
- case d.flushNotifyCh <- task:
- log.Debug("flush task is emitted successfully when file size exceeds",
- zap.Any("tables", table))
- d.fileSize[table] = 0
- default:
- }
- }
}
}
- }()
+ }
}
func generateSchemaFilePath(def cloudstorage.TableDefinition) string {
@@ -402,6 +408,4 @@ func (d *dmlWorker) close() {
if !atomic.CompareAndSwapUint64(&d.isClosed, 0, 1) {
return
}
-
- d.wg.Wait()
}
diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
index 5f8d375a5fc..c0e48c54e9d 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
@@ -18,6 +18,7 @@ import (
"net/url"
"os"
"path"
+ "sync"
"testing"
"time"
@@ -41,7 +42,6 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir)
storage, err := util.GetExternalStorageFromURI(ctx, uri)
require.Nil(t, err)
- errCh := make(chan error, 1)
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
cfg := cloudstorage.NewConfig()
@@ -50,7 +50,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
statistics := metrics.NewStatistics(ctx, sink.TxnSink)
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
- cfg, ".json", statistics, errCh)
+ cfg, ".json", statistics)
return d
}
@@ -129,12 +129,10 @@ func TestGenerateDataFilePath(t *testing.T) {
func TestDMLWorkerRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
parentDir := t.TempDir()
d := testDMLWorker(ctx, t, parentDir)
fragCh := chann.New[eventFragment]()
table1Dir := path.Join(parentDir, "test/table1/99")
- d.run(ctx, fragCh)
// assume table1 and table2 are dispatched to the same DML worker
table1 := model.TableName{
Schema: "test",
@@ -190,6 +188,13 @@ func TestDMLWorkerRun(t *testing.T) {
fragCh.In() <- frag
}
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _ = d.run(ctx, fragCh)
+ }()
+
time.Sleep(4 * time.Second)
// check whether files for table1 has been generated
files, err := os.ReadDir(table1Dir)
@@ -202,6 +207,7 @@ func TestDMLWorkerRun(t *testing.T) {
require.ElementsMatch(t, []string{"CDC000001.json", "schema.json"}, fileNames)
cancel()
d.close()
+ wg.Wait()
fragCh.Close()
for range fragCh.Out() {
// drain the fragCh
diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_writer.go b/cdc/sinkv2/eventsink/cloudstorage/dml_writer.go
index 2fe9af5a344..21099b8148f 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/dml_writer.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/dml_writer.go
@@ -14,31 +14,33 @@ package cloudstorage
import (
"context"
- "sync"
+ "github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/hash"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
+ "golang.org/x/sync/errgroup"
)
// dmlWriter manages a set of dmlWorkers and dispatches eventFragment to
// the dmlWorker according to hash algorithm.
type dmlWriter struct {
+ changefeedID model.ChangeFeedID
workers []*dmlWorker
workerChannels []*chann.Chann[eventFragment]
hasher *hash.PositionInertia
storage storage.ExternalStorage
config *cloudstorage.Config
extension string
- wg sync.WaitGroup
+ statistics *metrics.Statistics
inputCh <-chan eventFragment
errCh chan<- error
}
-func newDMLWriter(ctx context.Context,
+func newDMLWriter(
changefeedID model.ChangeFeedID,
storage storage.ExternalStorage,
config *cloudstorage.Config,
@@ -47,40 +49,53 @@ func newDMLWriter(ctx context.Context,
inputCh <-chan eventFragment,
errCh chan<- error,
) *dmlWriter {
- w := &dmlWriter{
+ d := &dmlWriter{
+ changefeedID: changefeedID,
storage: storage,
workerChannels: make([]*chann.Chann[eventFragment], config.WorkerCount),
+ workers: make([]*dmlWorker, config.WorkerCount),
hasher: hash.NewPositionInertia(),
config: config,
extension: extension,
+ statistics: statistics,
inputCh: inputCh,
errCh: errCh,
}
- w.wg.Add(1)
- go func() {
- defer w.wg.Done()
- w.dispatchFragToDMLWorker(ctx)
- }()
-
for i := 0; i < config.WorkerCount; i++ {
- d := newDMLWorker(i, changefeedID, storage, w.config, extension, statistics, errCh)
- w.workerChannels[i] = chann.New[eventFragment]()
- d.run(ctx, w.workerChannels[i])
- w.workers = append(w.workers, d)
+ worker := newDMLWorker(i, changefeedID, storage, config, extension, statistics)
+ d.workers[i] = worker
+ d.workerChannels[i] = chann.New[eventFragment]()
+ }
+
+ return d
+}
+
+func (d *dmlWriter) run(ctx context.Context) error {
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.Go(func() error {
+ return d.dispatchFragToDMLWorker(ctx)
+ })
+
+ for i := 0; i < d.config.WorkerCount; i++ {
+ worker := d.workers[i]
+ ch := d.workerChannels[i]
+ eg.Go(func() error {
+ return worker.run(ctx, ch)
+ })
}
- return w
+ return eg.Wait()
}
-func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) {
+func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
- return
+ return errors.Trace(ctx.Err())
case frag, ok := <-d.inputCh:
if !ok {
- return
+ return nil
}
tableName := frag.TableName
d.hasher.Reset()
@@ -92,7 +107,6 @@ func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) {
}
func (d *dmlWriter) close() {
- d.wg.Wait()
for _, w := range d.workers {
w.close()
}
diff --git a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go
index a5b00604532..494931eca1f 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go
@@ -14,13 +14,14 @@ package cloudstorage
import (
"context"
- "sync"
"sync/atomic"
+ "github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
)
// encodingWorker denotes the worker responsible for encoding RowChangedEvents
@@ -28,12 +29,10 @@ import (
type encodingWorker struct {
id int
changeFeedID model.ChangeFeedID
- wg sync.WaitGroup
encoder codec.EventBatchEncoder
isClosed uint64
inputCh chan eventFragment
defragmenter *defragmenter
- errCh chan<- error
}
func newEncodingWorker(
@@ -42,7 +41,6 @@ func newEncodingWorker(
encoder codec.EventBatchEncoder,
inputCh chan eventFragment,
defragmenter *defragmenter,
- errCh chan<- error,
) *encodingWorker {
return &encodingWorker{
id: workerID,
@@ -50,33 +48,33 @@ func newEncodingWorker(
encoder: encoder,
inputCh: inputCh,
defragmenter: defragmenter,
- errCh: errCh,
}
}
-func (w *encodingWorker) run(ctx context.Context) {
- w.wg.Add(1)
- go func() {
- log.Debug("encoding worker started", zap.Int("workerID", w.id),
- zap.String("namespace", w.changeFeedID.Namespace),
- zap.String("changefeed", w.changeFeedID.ID))
- defer w.wg.Done()
+func (w *encodingWorker) run(ctx context.Context) error {
+ log.Debug("encoding worker started", zap.Int("workerID", w.id),
+ zap.String("namespace", w.changeFeedID.Namespace),
+ zap.String("changefeed", w.changeFeedID.ID))
+
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.Go(func() error {
for {
select {
case <-ctx.Done():
- return
+ return errors.Trace(ctx.Err())
case frag, ok := <-w.inputCh:
if !ok || atomic.LoadUint64(&w.isClosed) == 1 {
- return
+ return nil
}
err := w.encodeEvents(ctx, frag)
if err != nil {
- w.errCh <- err
- return
+ return errors.Trace(err)
}
}
}
- }()
+ })
+
+ return eg.Wait()
}
func (w *encodingWorker) encodeEvents(ctx context.Context, frag eventFragment) error {
@@ -108,5 +106,4 @@ func (w *encodingWorker) close() {
if !atomic.CompareAndSwapUint64(&w.isClosed, 0, 1) {
return
}
- w.wg.Wait()
}
diff --git a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go
index ad7c919d7ec..7d589f71fa2 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go
@@ -16,6 +16,7 @@ import (
"context"
"fmt"
"net/url"
+ "sync"
"testing"
"github.com/pingcap/tiflow/cdc/model"
@@ -36,12 +37,11 @@ func testEncodingWorker(ctx context.Context, t *testing.T) (*encodingWorker, fun
encoderBuilder, err := builder.NewEventBatchEncoderBuilder(context.TODO(), encoderConfig)
require.Nil(t, err)
encoder := encoderBuilder.Build()
- errCh := make(chan error, 10)
changefeedID := model.DefaultChangeFeedID("test-encode")
msgCh := make(chan eventFragment, 1024)
defragmenter := newDefragmenter(ctx)
- worker := newEncodingWorker(1, changefeedID, encoder, msgCh, defragmenter, errCh)
+ worker := newEncodingWorker(1, changefeedID, encoder, msgCh, defragmenter)
return worker, func() {
defragmenter.close()
}
@@ -104,7 +104,6 @@ func TestEncodingWorkerRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
worker, fn := testEncodingWorker(ctx, t)
defer fn()
- worker.run(ctx)
table := model.TableName{
Schema: "test",
Table: "table1",
@@ -145,6 +144,15 @@ func TestEncodingWorkerRun(t *testing.T) {
}
worker.inputCh <- frag
}
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _ = worker.run(ctx)
+ }()
+
cancel()
worker.close()
+ wg.Wait()
}
diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go
index 99c0eb99b54..b7a5397303f 100644
--- a/cdc/sinkv2/eventsink/txn/worker.go
+++ b/cdc/sinkv2/eventsink/txn/worker.go
@@ -137,8 +137,8 @@ func (w *worker) runBackgroundLoop() {
defer ticker.Stop()
var flushTimeSlice, totalTimeSlice time.Duration
- overseerTimer := time.NewTicker(time.Second)
- defer overseerTimer.Stop()
+ overseerTicker := time.NewTicker(time.Second)
+ defer overseerTicker.Stop()
startToWork := time.Now()
Loop:
for {
@@ -163,7 +163,7 @@ func (w *worker) runBackgroundLoop() {
if w.doFlush(&flushTimeSlice) {
break Loop
}
- case now := <-overseerTimer.C:
+ case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000)
w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount))
diff --git a/deployments/ticdc/docker/test.Dockerfile b/deployments/ticdc/docker/test.Dockerfile
new file mode 100644
index 00000000000..7c1c07ce717
--- /dev/null
+++ b/deployments/ticdc/docker/test.Dockerfile
@@ -0,0 +1,6 @@
+FROM alpine:3.15
+RUN apk add --no-cache tzdata bash curl socat
+COPY ./bin/cdc /cdc
+EXPOSE 8300
+CMD [ "/cdc" ]
+
diff --git a/engine/pkg/externalresource/internal/s3/file_manager.go b/engine/pkg/externalresource/internal/s3/file_manager.go
index 8a07153af55..89a0647d757 100644
--- a/engine/pkg/externalresource/internal/s3/file_manager.go
+++ b/engine/pkg/externalresource/internal/s3/file_manager.go
@@ -119,7 +119,7 @@ func (m *FileManager) GetPersistedResource(
ok, err := storage.FileExists(ctx, placeholderFileName)
if err != nil {
- return nil, errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("check placeholder file")
+ return nil, errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("check placeholder file")
}
if !ok {
return nil, errors.ErrResourceFilesNotFound.GenWithStackByArgs()
@@ -293,7 +293,7 @@ func (m *FileManager) removeFilesIf(
return nil
})
if err != nil {
- return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("RemoveTemporaryFiles")
+ return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("RemoveTemporaryFiles")
}
log.Info("Removing resources",
@@ -303,7 +303,7 @@ func (m *FileManager) removeFilesIf(
for _, path := range toRemoveFiles {
if err := storage.DeleteFile(ctx, path); err != nil {
- return errors.ErrS3StorageAPI.Wrap(err)
+ return errors.ErrExternalStorageAPI.Wrap(err)
}
}
return nil
@@ -312,25 +312,25 @@ func (m *FileManager) removeFilesIf(
func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorage) error {
exists, err := storage.FileExists(ctx, placeholderFileName)
if err != nil {
- return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("checking placeholder file")
+ return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("checking placeholder file")
}
if exists {
// This should not happen in production. Unless the caller of the FileManager has a bug.
- return errors.ErrS3StorageAPI.GenWithStackByArgs("resource already exists")
+ return errors.ErrExternalStorageAPI.GenWithStackByArgs("resource already exists")
}
writer, err := storage.Create(ctx, placeholderFileName)
if err != nil {
- return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file")
+ return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file")
}
_, err = writer.Write(ctx, []byte("placeholder"))
if err != nil {
- return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("writing placeholder file")
+ return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("writing placeholder file")
}
if err := writer.Close(ctx); err != nil {
- return errors.ErrS3StorageAPI.Wrap(err).GenWithStackByArgs("closing placeholder file")
+ return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("closing placeholder file")
}
return nil
}
diff --git a/engine/pkg/externalresource/internal/s3/storage_factory.go b/engine/pkg/externalresource/internal/s3/storage_factory.go
index 996b5b60a40..0bdafeb9314 100644
--- a/engine/pkg/externalresource/internal/s3/storage_factory.go
+++ b/engine/pkg/externalresource/internal/s3/storage_factory.go
@@ -91,7 +91,7 @@ func GetExternalStorageFromURI(
}
// Note that we may have network I/O here.
- ret, err := util.GetExternalStorage(ctx, uri, opts)
+ ret, err := util.GetExternalStorage(ctx, uri, opts, util.DefaultS3Retryer())
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
diff --git a/errors.toml b/errors.toml
index 252852d595b..b83d3f13b98 100755
--- a/errors.toml
+++ b/errors.toml
@@ -998,7 +998,7 @@ failed to seek to the beginning of request body
["CDC:ErrS3StorageAPI"]
error = '''
-s3 storage api
+external storage api
'''
["CDC:ErrScanLockFailed"]
diff --git a/go.mod b/go.mod
index fb5ce3f4231..b0bf3f98e8e 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,8 @@ module github.com/pingcap/tiflow
go 1.19
require (
+ cloud.google.com/go/storage v1.27.0
+ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0
github.com/BurntSushi/toml v1.2.1
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.36.0
@@ -99,8 +101,8 @@ require (
golang.org/x/sys v0.2.0
golang.org/x/text v0.4.0
golang.org/x/time v0.2.0
- google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f
- google.golang.org/grpc v1.48.0
+ google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
+ google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.3.3
@@ -109,14 +111,13 @@ require (
)
require (
- cloud.google.com/go v0.102.0 // indirect
- cloud.google.com/go/compute v1.7.0 // indirect
- cloud.google.com/go/iam v0.3.0 // indirect
- cloud.google.com/go/storage v1.22.1 // indirect
+ cloud.google.com/go v0.105.0 // indirect
+ cloud.google.com/go/compute v1.13.0 // indirect
+ cloud.google.com/go/compute/metadata v0.2.1 // indirect
+ cloud.google.com/go/iam v0.8.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
- github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0 // indirect
github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
@@ -164,9 +165,8 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
- github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa // indirect
- github.com/googleapis/gax-go/v2 v2.4.0 // indirect
- github.com/googleapis/go-type-adapters v1.0.0 // indirect
+ github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
+ github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
@@ -257,7 +257,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/client/v2 v2.305.4 // indirect
- go.opencensus.io v0.23.0 // indirect
+ go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
@@ -272,8 +272,8 @@ require (
golang.org/x/oauth2 v0.2.0 // indirect
golang.org/x/term v0.2.0 // indirect
golang.org/x/tools v0.2.0 // indirect
- golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
- google.golang.org/api v0.84.0 // indirect
+ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
+ google.golang.org/api v0.103.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
diff --git a/go.sum b/go.sum
index 6742e738de4..0e31932b03a 100644
--- a/go.sum
+++ b/go.sum
@@ -28,8 +28,8 @@ cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Ud
cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA=
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
-cloud.google.com/go v0.102.0 h1:DAq3r8y4mDgyB/ZPJ9v/5VJNqjgJAxTn6ZYLlUywOu8=
-cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
+cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y=
+cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@@ -38,18 +38,17 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
-cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=
-cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M=
-cloud.google.com/go/compute v1.6.0/go.mod h1:T29tfhtVbq1wvAPo0E3+7vhgmkOYeXjhFvz/FMzPu0s=
-cloud.google.com/go/compute v1.6.1/go.mod h1:g85FgpzFvNULZ+S8AYq87axRKuf2Kh7deLqV/jJ3thU=
-cloud.google.com/go/compute v1.7.0 h1:v/k9Eueb8aAJ0vZuxKMrgm6kPhCLZU9HxFU+AFDs9Uk=
-cloud.google.com/go/compute v1.7.0/go.mod h1:435lt8av5oL9P3fv1OEzSbSUe+ybHXGMPQHHZWZxy9U=
+cloud.google.com/go/compute v1.13.0 h1:AYrLkB8NPdDRslNp4Jxmzrhdr03fUAIDbiGFjLWowoU=
+cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE=
+cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
+cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw=
-cloud.google.com/go/iam v0.3.0 h1:exkAomrVUuzx9kWFI1wm3KI0uoDeUFPB4kKGzx6x+Gc=
-cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY=
+cloud.google.com/go/iam v0.8.0 h1:E2osAkZzxI/+8pZcxVLcDtAQx/u+hZXVryUaYQ5O0Kk=
+cloud.google.com/go/iam v0.8.0/go.mod h1:lga0/y3iH6CX7sYqypWJ33hf7kkfXJag67naqGESjkE=
+cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
@@ -60,8 +59,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
-cloud.google.com/go/storage v1.22.1 h1:F6IlQJZrZM++apn9V5/VfS3gbTUYg98PS3EMQAzqtfg=
-cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
+cloud.google.com/go/storage v1.27.0 h1:YOO045NZI9RKfCj1c5A/ZtuuENUc8OAW+gHdGnDgyMQ=
+cloud.google.com/go/storage v1.27.0/go.mod h1:x9DOL8TK/ygDUMieqwfhdpQryTeEkhGKMi80i/iqR2s=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
@@ -199,7 +198,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
-github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@@ -313,7 +311,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
-github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/etcd-io/gofail v0.0.0-20190801230047-ad7f989257ca/go.mod h1:49H/RkXP8pKaZy4h0d+NW16rSLhyVBt4o6VLJbmOqDE=
@@ -528,7 +525,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
-github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
@@ -563,18 +559,14 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa h1:7MYGT2XEMam7Mtzv1yDUYXANedWvwk3HKkR3MyGowy8=
-github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
+github.com/googleapis/enterprise-certificate-proxy v0.2.0 h1:y8Yozv7SZtlU//QXbezB6QkpuE6jMD2/gfzk4AftXjs=
+github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
-github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/OthfcblKl4IGNaM=
-github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99EXz9pXxye9YM=
-github.com/googleapis/gax-go/v2 v2.4.0 h1:dS9eYAjhrE2RjmzYw2XAPvcXfmcQLtFEQWn0CR82awk=
-github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c=
-github.com/googleapis/go-type-adapters v1.0.0 h1:9XdMn+d/G57qq1s8dNc5IesGCXHf6V2HZ2JwRxfA2tA=
-github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
+github.com/googleapis/gax-go/v2 v2.7.0 h1:IcsPKeInNvYi7eqSaDjiZqDDKu5rsmunY0Y1YupQSSQ=
+github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
@@ -1535,10 +1527,7 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=
@@ -1560,9 +1549,6 @@ golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
golang.org/x/oauth2 v0.2.0 h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=
golang.org/x/oauth2 v0.2.0/go.mod h1:Cwn6afJ8jrQwYMxQDTpISoXmXW9I6qF6vDeuuoX3Ibs=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1576,7 +1562,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1676,15 +1661,10 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -1813,9 +1793,8 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
-golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
-golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
@@ -1853,16 +1832,9 @@ google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3h
google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo=
google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM=
google.golang.org/api v0.66.0/go.mod h1:I1dmXYpX7HGwz/ejRxwQp2qj5bFAz93HiCU1C1oYd9M=
-google.golang.org/api v0.67.0/go.mod h1:ShHKP8E60yPsKNw/w8w+VYaj9H6buA5UqDp8dhbQZ6g=
google.golang.org/api v0.69.0/go.mod h1:boanBiw+h5c3s+tBPgEzLDRHfFLWV0qXxRHz3ws7C80=
-google.golang.org/api v0.70.0/go.mod h1:Bs4ZM2HGifEvXwd50TtW70ovgJffJYw2oRCOFU/SkfA=
-google.golang.org/api v0.71.0/go.mod h1:4PyU6e6JogV1f9eA4voyrTY2batOLdgZ5qZ5HOCc4j8=
-google.golang.org/api v0.74.0/go.mod h1:ZpfMZOVRMywNyvJFeqL9HRWBgAuRfSjJFpe9QtRRyDs=
-google.golang.org/api v0.75.0/go.mod h1:pU9QmyHLnzlpar1Mjt4IbapUCy8J+6HD6GeELN69ljA=
-google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6rKmw=
-google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg=
-google.golang.org/api v0.84.0 h1:NMB9J4cCxs9xEm+1Z9QiO3eFvn7EnQj3Eo3hN6ugVlg=
-google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
+google.golang.org/api v0.103.0 h1:9yuVqlu2JCvcLg9p8S3fcFLZij8EPSyvODIY1rkMizQ=
+google.golang.org/api v0.103.0/go.mod h1:hGtW6nK1AC+d9si/UBhw8Xli+QMOf6xyNAyJw4qU9w0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -1914,7 +1886,6 @@ google.golang.org/genproto v0.0.0-20210222152913-aa3ee6e6a81c/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
-google.golang.org/genproto v0.0.0-20210329143202-679c6ae281ee/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
@@ -1939,28 +1910,11 @@ google.golang.org/genproto v0.0.0-20211221195035-429b39de9b1c/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20211223182754-3ac035c7e7cb/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
-google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
-google.golang.org/genproto v0.0.0-20220207164111-0872dc986b00/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/genproto v0.0.0-20220216160803-4663080d8bc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
-google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
-google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
-google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
-google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
-google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E=
-google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
-google.golang.org/genproto v0.0.0-20220413183235-5e96e2839df9/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
-google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
-google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
-google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
-google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
-google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
-google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
-google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
-google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
-google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f h1:P8EiVSxZwC6xH2niv2N66aqwMtYFg+D54gbjpcqKJtM=
-google.golang.org/genproto v0.0.0-20220719170305-83ca9fad585f/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
+google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c h1:S34D59DS2GWOEwWNt4fYmTcFrtlOgukG2k9WsomZ7tg=
+google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
@@ -1996,12 +1950,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
-google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
-google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
-google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY=
+google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -2016,7 +1966,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json
index c3d8c911cde..42fc7e0630c 100644
--- a/metrics/grafana/ticdc.json
+++ b/metrics/grafana/ticdc.json
@@ -20770,6 +20770,104 @@
"align": false,
"alignLevel": null
}
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_TEST-CLUSTER}",
+ "description": "Redo bgUpdateLog worker busy ratio",
+ "fieldConfig": {
+ "defaults": {},
+ "overrides": []
+ },
+ "fill": 1,
+ "fillGradient": 0,
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 106
+ },
+ "hiddenSeries": false,
+ "id": 723,
+ "legend": {
+ "alignAsTable": true,
+ "avg": false,
+ "current": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "nullPointMode": "null",
+ "options": {
+ "alertThreshold": true
+ },
+ "percentage": false,
+ "pluginVersion": "7.5.11",
+ "pointradius": 2,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "exemplar": true,
+ "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
+ "interval": "",
+ "legendFormat": "{{changefeed}}-{{instance}}",
+ "queryType": "randomWalk",
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "Worker Busy Ratio",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "percent",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
}
],
"title": "Redo",
diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go
index 03d79b3f5f9..f0a8486eb3e 100644
--- a/pkg/cmd/server/server_test.go
+++ b/pkg/cmd/server/server_test.go
@@ -179,8 +179,9 @@ func TestParseCfg(t *testing.T) {
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
- EnableDBSorter: true,
- EnableNewScheduler: true,
+ EnableDBSorter: true,
+ EnableNewScheduler: true,
+ EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
@@ -255,6 +256,7 @@ region-retry-duration = "3s"
[debug]
enable-db-sorter = true
+enable-pull-based-sink = true
[debug.db]
count = 5
concurrency = 6
@@ -336,8 +338,9 @@ check-balance-interval = "10s"
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
- EnableDBSorter: true,
- EnableNewScheduler: true,
+ EnableDBSorter: true,
+ EnablePullBasedSink: true,
+ EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 5,
Concurrency: 6,
@@ -483,8 +486,9 @@ cert-allowed-cn = ["dd","ee"]
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
- EnableDBSorter: true,
- EnableNewScheduler: true,
+ EnableDBSorter: true,
+ EnableNewScheduler: true,
+ EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
@@ -548,8 +552,9 @@ unknown3 = 3
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
- EnableDBSorter: true,
- EnableNewScheduler: true,
+ EnableDBSorter: true,
+ EnableNewScheduler: true,
+ EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go
index f5dc96b8566..223d1f348b1 100644
--- a/pkg/config/config_test_data.go
+++ b/pkg/config/config_test_data.go
@@ -106,7 +106,7 @@ const (
"table-actor": {
"event-batch-size": 32
},
- "enable-pull-based-sink": false,
+ "enable-pull-based-sink": true,
"enable-db-sorter": true,
"db": {
"count": 8,
diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go
index 4e4c94ca93d..9dced1585e4 100644
--- a/pkg/config/consistent.go
+++ b/pkg/config/consistent.go
@@ -22,9 +22,10 @@ import (
)
const (
- // MinFlushIntervalInMs is the minimum value of flush interval, which is set
- // to two seconds to reduce the frequency of accessing external storage.
- MinFlushIntervalInMs = 2000
+ // DefaultFlushIntervalInMs is the default flush interval for redo log.
+ DefaultFlushIntervalInMs = 2000
+ // minFlushIntervalInMs is the minimum flush interval for redo log.
+ minFlushIntervalInMs = 50
)
// ConsistentConfig represents replication consistency config for a changefeed.
@@ -41,10 +42,14 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
return nil
}
- if c.FlushIntervalInMs < MinFlushIntervalInMs {
+ if c.FlushIntervalInMs == 0 {
+ c.FlushIntervalInMs = DefaultFlushIntervalInMs
+ }
+
+ if c.FlushIntervalInMs < minFlushIntervalInMs {
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(
fmt.Sprintf("The consistent.flush-interval:%d must be equal or greater than %d",
- c.FlushIntervalInMs, MinFlushIntervalInMs))
+ c.FlushIntervalInMs, minFlushIntervalInMs))
}
uri, err := storage.ParseRawURL(c.Storage)
diff --git a/pkg/config/debug.go b/pkg/config/debug.go
index 3ca616be1cd..e4ed8b8d0ec 100644
--- a/pkg/config/debug.go
+++ b/pkg/config/debug.go
@@ -22,7 +22,7 @@ import (
type DebugConfig struct {
TableActor *TableActorConfig `toml:"table-actor" json:"table-actor"`
- // EnablePullBasedSink enables pull-based sink, false by default.
+ // EnablePullBasedSink enables pull-based sink, true by default.
//
// NOTE: currently it can only be enabled with EnableDBSorter, because unified
// sorter hasn't been transformed into the new interface.
diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go
index 9c34562b15b..d70c478f52f 100644
--- a/pkg/config/replica_config.go
+++ b/pkg/config/replica_config.go
@@ -62,7 +62,7 @@ var defaultReplicaConfig = &ReplicaConfig{
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
- FlushIntervalInMs: MinFlushIntervalInMs,
+ FlushIntervalInMs: DefaultFlushIntervalInMs,
Storage: "",
},
}
diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go
index 154538816fd..b8baeabdb6b 100644
--- a/pkg/config/server_config.go
+++ b/pkg/config/server_config.go
@@ -151,7 +151,7 @@ var defaultServerConfig = &ServerConfig{
Scheduler: NewDefaultSchedulerConfig(),
EnableNewSink: true,
- EnablePullBasedSink: false,
+ EnablePullBasedSink: true,
},
ClusterID: "default",
}
diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go
index bd5fd9eac46..fb9b2f63b68 100644
--- a/pkg/errors/cdc_errors.go
+++ b/pkg/errors/cdc_errors.go
@@ -299,8 +299,8 @@ var (
"rawData size %d exceeds maximum file size %d",
errors.RFCCodeText("CDC:ErrFileSizeExceed"),
)
- ErrS3StorageAPI = errors.Normalize(
- "s3 storage api",
+ ErrExternalStorageAPI = errors.Normalize(
+ "external storage api",
errors.RFCCodeText("CDC:ErrS3StorageAPI"),
)
ErrStorageInitialize = errors.Normalize(
diff --git a/pkg/redo/config.go b/pkg/redo/config.go
index 8016b12f495..8268590ec84 100644
--- a/pkg/redo/config.go
+++ b/pkg/redo/config.go
@@ -27,6 +27,13 @@ import (
"github.com/pingcap/tiflow/pkg/util"
)
+var (
+ // DefaultTimeout is the default timeout for writing external storage
+ DefaultTimeout = 15 * time.Minute
+ // CloseTimeout is the default timeout for close redo writer
+ CloseTimeout = 15 * time.Second
+)
+
const (
// DefaultFileMode is the default mode when operation files
DefaultFileMode = 0o644
@@ -39,8 +46,6 @@ const (
LogEXT = ".log"
// MetaEXT is the meta file ext of meta file after safely wrote to disk
MetaEXT = ".meta"
- // MetaTmpEXT is the meta file ext of meta file before safely wrote to disk
- MetaTmpEXT = ".mtmp"
// SortLogEXT is the sorted log file ext of log file after safely wrote to disk
SortLogEXT = ".sort"
@@ -157,13 +162,21 @@ func IsBlackholeStorage(scheme string) bool {
// InitExternalStorage init an external storage.
var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
+ s, err := util.GetExternalStorageWithTimeout(ctx, uri.String(), DefaultTimeout)
+ if err != nil {
+ return nil, errors.WrapChangefeedUnretryableErr(errors.ErrStorageInitialize, err)
+ }
+ return s, nil
+}
+
+func initExternalStorageForTest(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
if ConsistentStorage(uri.Scheme) == consistentStorageS3 && len(uri.Host) == 0 {
// TODO: this branch is compatible with previous s3 logic and will be removed
// in the future.
return nil, errors.WrapChangefeedUnretryableErr(errors.ErrStorageInitialize,
errors.Errorf("please specify the bucket for %+v", uri))
}
- s, err := util.GetExternalStorage(ctx, uri.String(), nil)
+ s, err := util.GetExternalStorageFromURI(ctx, uri.String())
if err != nil {
return nil, errors.WrapChangefeedUnretryableErr(errors.ErrStorageInitialize, err)
}
@@ -183,7 +196,7 @@ func ValidateStorage(uri *url.URL) error {
if IsExternalStorage(scheme) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
- _, err := InitExternalStorage(ctx, *uri)
+ _, err := initExternalStorageForTest(ctx, *uri)
return err
}
@@ -202,6 +215,9 @@ const (
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
+ // RedoMetaFileFormat is the format of redo meta file, which contains namespace information.
+ // layout: captureID_namespace_changefeedID_fileType_uuid.fileExtName
+ RedoMetaFileFormat = "%s_%s_%s_%s_%s%s"
)
// logFormat2ParseFormat converts redo log file name format to the space separated
diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go
index 90359c00cce..6e0fe690da7 100644
--- a/pkg/util/external_storage.go
+++ b/pkg/util/external_storage.go
@@ -15,11 +15,16 @@ package util
import (
"context"
+ "os"
"strings"
"time"
+ gcsStorage "cloud.google.com/go/storage"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+ "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
+ "github.com/aws/aws-sdk-go/service/s3"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/pkg/errors"
@@ -30,12 +35,29 @@ import (
func GetExternalStorageFromURI(
ctx context.Context, uri string,
) (storage.ExternalStorage, error) {
- return GetExternalStorage(ctx, uri, nil)
+ return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer())
+}
+
+// GetExternalStorageWithTimeout creates a new storage.ExternalStorage from a uri
+// without retry. It is the caller's responsibility to set timeout to the context.
+func GetExternalStorageWithTimeout(
+ ctx context.Context, uri string, timeout time.Duration,
+) (storage.ExternalStorage, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ s, err := GetExternalStorage(ctx, uri, nil, nil)
+
+ return &extStorageWithTimeout{
+ ExternalStorage: s,
+ timeout: timeout,
+ }, err
}
// GetExternalStorage creates a new storage.ExternalStorage based on the uri and options.
func GetExternalStorage(
- ctx context.Context, uri string, opts *storage.BackendOptions,
+ ctx context.Context, uri string,
+ opts *storage.BackendOptions,
+ retryer request.Retryer,
) (storage.ExternalStorage, error) {
backEnd, err := storage.ParseBackend(uri, opts)
if err != nil {
@@ -44,7 +66,7 @@ func GetExternalStorage(
ret, err := storage.New(ctx, backEnd, &storage.ExternalStorageOptions{
SendCredentials: false,
- S3Retryer: DefaultS3Retryer(),
+ S3Retryer: retryer,
})
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
@@ -90,3 +112,103 @@ func DefaultS3Retryer() request.Retryer {
},
}
}
+
+type extStorageWithTimeout struct {
+ storage.ExternalStorage
+ timeout time.Duration
+}
+
+// WriteFile writes a complete file to storage, similar to os.WriteFile,
+// but WriteFile should be atomic
+func (s *extStorageWithTimeout) WriteFile(ctx context.Context, name string, data []byte) error {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.WriteFile(ctx, name, data)
+}
+
+// ReadFile reads a complete file from storage, similar to os.ReadFile
+func (s *extStorageWithTimeout) ReadFile(ctx context.Context, name string) ([]byte, error) {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.ReadFile(ctx, name)
+}
+
+// FileExists return true if file exists
+func (s *extStorageWithTimeout) FileExists(ctx context.Context, name string) (bool, error) {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.FileExists(ctx, name)
+}
+
+// DeleteFile delete the file in storage
+func (s *extStorageWithTimeout) DeleteFile(ctx context.Context, name string) error {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.DeleteFile(ctx, name)
+}
+
+// Open a Reader by file path. path is relative path to storage base path
+func (s *extStorageWithTimeout) Open(
+ ctx context.Context, path string,
+) (storage.ExternalFileReader, error) {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.Open(ctx, path)
+}
+
+// WalkDir traverse all the files in a dir.
+func (s *extStorageWithTimeout) WalkDir(
+ ctx context.Context, opt *storage.WalkOption, fn func(path string, size int64) error,
+) error {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.WalkDir(ctx, opt, fn)
+}
+
+// Create opens a file writer by path. path is relative path to storage base path
+func (s *extStorageWithTimeout) Create(
+ ctx context.Context, path string,
+) (storage.ExternalFileWriter, error) {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.Create(ctx, path)
+}
+
+// Rename file name from oldFileName to newFileName
+func (s *extStorageWithTimeout) Rename(
+ ctx context.Context, oldFileName, newFileName string,
+) error {
+ ctx, cancel := context.WithTimeout(ctx, s.timeout)
+ defer cancel()
+ return s.ExternalStorage.Rename(ctx, oldFileName, newFileName)
+}
+
+// IsNotExistInExtStorage checks if the error is caused by the file not exist in external storage.
+func IsNotExistInExtStorage(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ if os.IsNotExist(errors.Cause(err)) {
+ return true
+ }
+
+ if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint
+ switch aerr.Code() {
+ case s3.ErrCodeNoSuchBucket, s3.ErrCodeNoSuchKey, "NotFound":
+ return true
+ }
+ }
+
+ if errors.Cause(err) == gcsStorage.ErrObjectNotExist { // nolint:errorlint
+ return true
+ }
+
+ var errResp *azblob.StorageError
+ if internalErr, ok := err.(*azblob.InternalError); ok && internalErr.As(&errResp) {
+ if errResp.ErrorCode == azblob.StorageErrorCodeBlobNotFound {
+ return true
+ }
+ }
+ return false
+}
diff --git a/tests/integration_tests/processor_err_chan/run.sh b/tests/integration_tests/processor_err_chan/run.sh
index 8b0f2f73b0e..3919bab5e7b 100644
--- a/tests/integration_tests/processor_err_chan/run.sh
+++ b/tests/integration_tests/processor_err_chan/run.sh
@@ -27,7 +27,7 @@ function run() {
run_sql "CREATE table processor_err_chan.t$i (id int primary key auto_increment)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
done
- export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/pipeline/ProcessorAddTableError=1*return(true)'
+ export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller/ProcessorAddTableError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')