diff --git a/cdc/changefeed.go b/cdc/changefeed.go index de52eba3457..b3930947ba3 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -20,9 +20,8 @@ import ( "sync" "time" - "github.com/pingcap/failpoint" - "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/cdc/entry" @@ -364,6 +363,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model } c.taskStatus[captureID] = newStatus.Clone() log.Info("dispatch table success", zap.String("capture-id", captureID), zap.Stringer("status", newStatus)) + failpoint.Inject("OwnerRemoveTableError", func() { + if len(cleanedTables) > 0 { + failpoint.Return(errors.New("failpoint injected error")) + } + }) } for tableID := range cleanedTables { @@ -514,13 +518,16 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model log.Warn("ignored the move job, the source capture is not found", zap.Reflect("job", job)) continue } - replicaInfo, exist := status.RemoveTable(tableID, c.status.CheckpointTs) + // To ensure that the replication pipeline stops exactly at the boundary TS, + // The boundary TS specified by Remove Table Operation MUST greater or equal to the checkpoint TS of this table. + // So the global resolved TS is a reasonable values. + replicaInfo, exist := status.RemoveTable(tableID, c.status.ResolvedTs) if !exist { delete(c.moveTableJobs, tableID) log.Warn("ignored the move job, the table is not exist in the source capture", zap.Reflect("job", job)) continue } - replicaInfo.StartTs = c.status.CheckpointTs + replicaInfo.StartTs = c.status.ResolvedTs job.TableReplicaInfo = replicaInfo job.Status = model.MoveTableStatusDeleted log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job)) @@ -528,14 +535,13 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model // add table to target capture status, exist := cloneStatus(job.To) replicaInfo := job.TableReplicaInfo.Clone() - replicaInfo.StartTs = c.status.CheckpointTs if !exist { // the target capture is not exist, add table to orphanTables. c.orphanTables[tableID] = replicaInfo.StartTs log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job)) continue } - status.AddTable(tableID, replicaInfo, c.status.CheckpointTs) + status.AddTable(tableID, replicaInfo, replicaInfo.StartTs) job.Status = model.MoveTableStatusFinished delete(c.moveTableJobs, tableID) log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job)) diff --git a/cdc/metrics.go b/cdc/metrics.go index 3c853557f66..ac3e3ea163e 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -14,8 +14,6 @@ package cdc import ( - "time" - "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/puller" @@ -24,10 +22,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const ( - defaultMetricInterval = time.Second * 15 -) - var registry = prometheus.NewRegistry() func init() { diff --git a/cdc/metrics_processor.go b/cdc/metrics_processor.go index 480fa4f0c82..7cc422b0219 100644 --- a/cdc/metrics_processor.go +++ b/cdc/metrics_processor.go @@ -75,13 +75,6 @@ var ( Help: "The time it took to update sub changefeed info.", Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 18), }, []string{"capture"}) - tableOutputChanSizeGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "txn_output_chan_size", - Help: "size of row changed event output channel from table to processor", - }, []string{"changefeed", "capture"}) processorErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ticdc", @@ -89,14 +82,6 @@ var ( Name: "exit_with_error_count", Help: "counter for processor exits with error", }, []string{"changefeed", "capture"}) - sinkFlushRowChangedDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "flush_event_duration_seconds", - Help: "Bucketed histogram of processing time (s) of flushing events in processor", - Buckets: prometheus.ExponentialBuckets(0.002 /* 2ms */, 2, 20), - }, []string{"changefeed", "capture"}) ) // initProcessorMetrics registers all metrics used in processor @@ -109,7 +94,5 @@ func initProcessorMetrics(registry *prometheus.Registry) { registry.MustRegister(syncTableNumGauge) registry.MustRegister(txnCounter) registry.MustRegister(updateInfoDuration) - registry.MustRegister(tableOutputChanSizeGauge) registry.MustRegister(processorErrorCounter) - registry.MustRegister(sinkFlushRowChangedDuration) } diff --git a/cdc/processor.go b/cdc/processor.go index 32f53b89389..9f0cd1243ef 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -52,12 +52,6 @@ import ( ) const ( - // TODO: processor output chan size, the accumulated data is determined by - // the count of sorted data and unmounted data. In current benchmark a single - // processor can reach 50k-100k QPS, and accumulated data is around - // 200k-400k in most cases. We need a better chan cache mechanism. - defaultOutputChanSize = 1280000 - // defaultMemBufferCapacity is the default memory buffer per change feed. defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G @@ -79,9 +73,8 @@ type processor struct { etcdCli kv.CDCEtcdClient session *concurrency.Session - sink sink.Sink + sinkManager *sink.Manager - sinkEmittedResolvedTs uint64 globalResolvedTs uint64 localResolvedTs uint64 checkpointTs uint64 @@ -92,7 +85,6 @@ type processor struct { ddlPullerCancel context.CancelFunc schemaStorage *entry.SchemaStorage - output chan *model.PolymorphicEvent mounter entry.Mounter stateMu sync.Mutex @@ -102,12 +94,11 @@ type processor struct { markTableIDs map[int64]struct{} statusModRevision int64 - sinkEmittedResolvedNotifier *notify.Notifier - sinkEmittedResolvedReceiver *notify.Receiver - localResolvedNotifier *notify.Notifier - localResolvedReceiver *notify.Receiver - localCheckpointTsNotifier *notify.Notifier - localCheckpointTsReceiver *notify.Receiver + globalResolvedTsNotifier *notify.Notifier + localResolvedNotifier *notify.Notifier + localResolvedReceiver *notify.Receiver + localCheckpointTsNotifier *notify.Notifier + localCheckpointTsReceiver *notify.Receiver wg *errgroup.Group errCh chan<- error @@ -115,18 +106,16 @@ type processor struct { } type tableInfo struct { - id int64 - name string // quoted schema and table, used in metircs only - resolvedTs uint64 - markTableID int64 - mResolvedTs uint64 - sorter *puller.Rectifier - workload model.WorkloadInfo - cancel context.CancelFunc - // isDying shows that the table is being removed. - // In the case the same table is added back before safe removal is finished, - // this flag is used to tell whether it's safe to kill the table. - isDying uint32 + id int64 + name string // quoted schema and table, used in metircs only + resolvedTs uint64 + checkpointTs uint64 + + markTableID int64 + mResolvedTs uint64 + mCheckpointTs uint64 + workload model.WorkloadInfo + cancel context.CancelFunc } func (t *tableInfo) loadResolvedTs() uint64 { @@ -140,15 +129,15 @@ func (t *tableInfo) loadResolvedTs() uint64 { return tableRts } -// safeStop will stop the table change feed safety -func (t *tableInfo) safeStop() (stopped bool, checkpointTs model.Ts) { - atomic.StoreUint32(&t.isDying, 1) - t.sorter.SafeStop() - status := t.sorter.GetStatus() - if status != model.SorterStatusStopped && status != model.SorterStatusFinished { - return false, 0 +func (t *tableInfo) loadCheckpointTs() uint64 { + tableCkpt := atomic.LoadUint64(&t.checkpointTs) + if t.markTableID != 0 { + mTableCkpt := atomic.LoadUint64(&t.mCheckpointTs) + if mTableCkpt < tableCkpt { + return mTableCkpt + } } - return true, t.sorter.GetMaxResolvedTs() + return tableCkpt } // newProcessor creates and returns a processor for the specified change feed @@ -158,7 +147,7 @@ func newProcessor( credential *security.Credential, session *concurrency.Session, changefeed model.ChangeFeedInfo, - sink sink.Sink, + sinkManager *sink.Manager, changefeedID string, captureInfo model.CaptureInfo, checkpointTs uint64, @@ -186,13 +175,9 @@ func newProcessor( return nil, errors.Trace(err) } - sinkEmittedResolvedNotifier := new(notify.Notifier) localResolvedNotifier := new(notify.Notifier) localCheckpointTsNotifier := new(notify.Notifier) - sinkEmittedResolvedReceiver, err := sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond) - if err != nil { - return nil, err - } + globalResolvedTsNotifier := new(notify.Notifier) localResolvedReceiver, err := localResolvedNotifier.NewReceiver(50 * time.Millisecond) if err != nil { return nil, err @@ -212,7 +197,7 @@ func newProcessor( credential: credential, etcdCli: cdcEtcdCli, session: session, - sink: sink, + sinkManager: sinkManager, ddlPuller: ddlPuller, mounter: entry.NewMounter(schemaStorage, changefeed.Config.Mounter.WorkerNum, changefeed.Config.EnableOldValue), schemaStorage: schemaStorage, @@ -221,13 +206,10 @@ func newProcessor( flushCheckpointInterval: flushCheckpointInterval, position: &model.TaskPosition{CheckPointTs: checkpointTs}, - output: make(chan *model.PolymorphicEvent, defaultOutputChanSize), - - sinkEmittedResolvedNotifier: sinkEmittedResolvedNotifier, - sinkEmittedResolvedReceiver: sinkEmittedResolvedReceiver, - localResolvedNotifier: localResolvedNotifier, - localResolvedReceiver: localResolvedReceiver, + globalResolvedTsNotifier: globalResolvedTsNotifier, + localResolvedNotifier: localResolvedNotifier, + localResolvedReceiver: localResolvedReceiver, checkpointTs: checkpointTs, localCheckpointTsNotifier: localCheckpointTsNotifier, @@ -266,18 +248,6 @@ func (p *processor) Run(ctx context.Context) { return p.globalStatusWorker(cctx) }) - wg.Go(func() error { - return p.sinkDriver(cctx) - }) - - wg.Go(func() error { - return p.syncResolved(cctx) - }) - - wg.Go(func() error { - return p.collectMetrics(cctx) - }) - wg.Go(func() error { return p.ddlPuller.Run(ddlPullerCtx) }) @@ -413,11 +383,20 @@ func (p *processor) positionWorker(ctx context.Context) error { } } case <-p.localCheckpointTsReceiver.C: - checkpointTs := atomic.LoadUint64(&p.checkpointTs) + checkpointTs := atomic.LoadUint64(&p.globalResolvedTs) + p.stateMu.Lock() + for _, table := range p.tables { + ts := table.loadCheckpointTs() + if ts < checkpointTs { + checkpointTs = ts + } + } + p.stateMu.Unlock() if checkpointTs == 0 { log.Warn("0 is not a valid checkpointTs", util.ZapFieldChangefeed(ctx)) continue } + atomic.StoreUint64(&p.checkpointTs, checkpointTs) phyTs := oracle.ExtractPhysical(checkpointTs) // It is more accurate to get tso from PD, but in most cases we have // deployed NTP service, a little bias is acceptable here. @@ -584,9 +563,6 @@ func (p *processor) removeTable(tableID int64) { return } - if atomic.SwapUint32(&table.isDying, 0) == 0 { - return - } table.cancel() delete(p.tables, tableID) if table.markTableID != 0 { @@ -604,6 +580,10 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) } if opt.Delete { if opt.BoundaryTs <= p.position.CheckPointTs { + if opt.BoundaryTs != p.position.CheckPointTs { + log.Warn("the replication progresses beyond the BoundaryTs and duplicate data may be received by downstream", + zap.Uint64("local resolved TS", p.position.ResolvedTs), zap.Any("opt", opt)) + } table, exist := p.tables[tableID] if !exist { log.Warn("table which will be deleted is not found", @@ -613,19 +593,17 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) status.Dirty = true continue } - stopped, checkpointTs := table.safeStop() - log.Debug("safeStop table", zap.Int64("tableID", tableID), - util.ZapFieldChangefeed(ctx), zap.Bool("stopped", stopped), + table.cancel() + checkpointTs := table.loadCheckpointTs() + log.Debug("stop table", zap.Int64("tableID", tableID), + util.ZapFieldChangefeed(ctx), + zap.Any("opt", opt), zap.Uint64("checkpointTs", checkpointTs)) - if stopped { - opt.BoundaryTs = checkpointTs - if checkpointTs <= p.position.CheckPointTs { - tablesToRemove = append(tablesToRemove, tableID) - opt.Done = true - opt.Status = model.OperFinished - } - status.Dirty = true - } + opt.BoundaryTs = checkpointTs + tablesToRemove = append(tablesToRemove, tableID) + opt.Done = true + opt.Status = model.OperFinished + status.Dirty = true } } else { replicaInfo, exist := status.Tables[tableID] @@ -677,18 +655,12 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { log.Info("Global status worker started", util.ZapFieldChangefeed(ctx)) var ( - changefeedStatus *model.ChangeFeedStatus - statusRev int64 - lastCheckPointTs uint64 - lastResolvedTs uint64 - watchKey = kv.GetEtcdKeyJob(p.changefeedID) - globalResolvedTsNotifier = new(notify.Notifier) + changefeedStatus *model.ChangeFeedStatus + statusRev int64 + lastCheckPointTs uint64 + lastResolvedTs uint64 + watchKey = kv.GetEtcdKeyJob(p.changefeedID) ) - defer globalResolvedTsNotifier.Close() - globalResolvedTsReceiver, err := globalResolvedTsNotifier.NewReceiver(1 * time.Second) - if err != nil { - return err - } updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) @@ -709,34 +681,10 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { atomic.StoreUint64(&p.globalResolvedTs, lastResolvedTs) log.Debug("Update globalResolvedTs", zap.Uint64("globalResolvedTs", lastResolvedTs), util.ZapFieldChangefeed(ctx)) - globalResolvedTsNotifier.Notify() + p.globalResolvedTsNotifier.Notify() } } - go func() { - for { - select { - case <-ctx.Done(): - return - case <-globalResolvedTsReceiver.C: - globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs) - localResolvedTs := atomic.LoadUint64(&p.localResolvedTs) - if globalResolvedTs > localResolvedTs { - log.Warn("globalResolvedTs too large", zap.Uint64("globalResolvedTs", globalResolvedTs), - zap.Uint64("localResolvedTs", localResolvedTs), util.ZapFieldChangefeed(ctx)) - // we do not issue resolved events if globalResolvedTs > localResolvedTs. - continue - } - select { - case <-ctx.Done(): - return - case p.output <- model.NewResolvedPolymorphicEvent(0, globalResolvedTs): - // regionID = 0 means the event is produced by TiCDC - } - } - } - }() - retryCfg := backoff.WithMaxRetries( backoff.WithContext( backoff.NewExponentialBackOff(), ctx), @@ -787,153 +735,6 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { } } -func (p *processor) sinkDriver(ctx context.Context) error { - metricFlushDuration := sinkFlushRowChangedDuration.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-p.sinkEmittedResolvedReceiver.C: - sinkEmittedResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs) - globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs) - var minTs uint64 - if sinkEmittedResolvedTs < globalResolvedTs { - minTs = sinkEmittedResolvedTs - } else { - minTs = globalResolvedTs - } - if minTs == 0 || atomic.LoadUint64(&p.checkpointTs) == minTs { - continue - } - start := time.Now() - - checkpointTs, err := p.sink.FlushRowChangedEvents(ctx, minTs) - if err != nil { - return errors.Trace(err) - } - if checkpointTs != 0 { - atomic.StoreUint64(&p.checkpointTs, checkpointTs) - p.localCheckpointTsNotifier.Notify() - } - - dur := time.Since(start) - metricFlushDuration.Observe(dur.Seconds()) - if dur > 3*time.Second { - log.Warn("flush row changed events too slow", - zap.Duration("duration", dur), util.ZapFieldChangefeed(ctx)) - } - } - } -} - -// syncResolved handle `p.ddlJobsCh` and `p.resolvedTxns` -func (p *processor) syncResolved(ctx context.Context) error { - defer func() { - p.sinkEmittedResolvedReceiver.Stop() - log.Info("syncResolved stopped", util.ZapFieldChangefeed(ctx)) - }() - - events := make([]*model.PolymorphicEvent, 0, defaultSyncResolvedBatch) - rows := make([]*model.RowChangedEvent, 0, defaultSyncResolvedBatch) - - flushRowChangedEvents := func() error { - for _, ev := range events { - err := ev.WaitPrepare(ctx) - if err != nil { - return errors.Trace(err) - } - if ev.Row == nil { - continue - } - rows = append(rows, ev.Row) - } - failpoint.Inject("ProcessorSyncResolvedPreEmit", func() { - log.Info("Prepare to panic for ProcessorSyncResolvedPreEmit") - time.Sleep(10 * time.Second) - panic("ProcessorSyncResolvedPreEmit") - }) - err := p.sink.EmitRowChangedEvents(ctx, rows...) - if err != nil { - return errors.Trace(err) - } - events = events[:0] - rows = rows[:0] - return nil - } - - processRowChangedEvent := func(row *model.PolymorphicEvent) error { - events = append(events, row) - - if len(events) >= defaultSyncResolvedBatch { - err := flushRowChangedEvents() - if err != nil { - return errors.Trace(err) - } - } - return nil - } - - var resolvedTs uint64 - for { - select { - case <-ctx.Done(): - return ctx.Err() - case row := <-p.output: - if row == nil { - continue - } - failpoint.Inject("ProcessorSyncResolvedError", func() { - failpoint.Return(errors.New("processor sync resolved injected error")) - }) - if row.RawKV != nil && row.RawKV.OpType == model.OpTypeResolved { - err := flushRowChangedEvents() - if err != nil { - return errors.Trace(err) - } - resolvedTs = row.CRTs - atomic.StoreUint64(&p.sinkEmittedResolvedTs, row.CRTs) - p.sinkEmittedResolvedNotifier.Notify() - continue - } - // Global resolved ts should fallback in some table rebalance cases, - // since the start-ts(from checkpoint ts) or a rebalanced table could - // be less then the global resolved ts. - localResolvedTs := atomic.LoadUint64(&p.localResolvedTs) - if resolvedTs > localResolvedTs { - log.Info("global resolved ts fallback", - zap.String("changefeed", p.changefeedID), - zap.Uint64("localResolvedTs", localResolvedTs), - zap.Uint64("resolvedTs", resolvedTs), - ) - resolvedTs = localResolvedTs - } - if row.CRTs <= resolvedTs { - _ = row.WaitPrepare(ctx) - log.Panic("The CRTs must be greater than the resolvedTs", - zap.String("model", "processor"), - zap.String("changefeed", p.changefeedID), - zap.Uint64("resolvedTs", resolvedTs), - zap.Any("row", row)) - } - err := processRowChangedEvent(row) - if err != nil { - return errors.Trace(err) - } - } - } -} - -func (p *processor) collectMetrics(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(defaultMetricInterval): - tableOutputChanSizeGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(float64(len(p.output))) - } - } -} - func createSchemaStorage( kvStorage tidbkv.Storage, checkpointTs uint64, @@ -964,14 +765,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo tableName = strconv.Itoa(int(tableID)) } - if table, ok := p.tables[tableID]; ok { - if atomic.SwapUint32(&table.isDying, 0) == 1 { - log.Warn("The same table exists but is dying. Cancel it and continue.", util.ZapFieldChangefeed(ctx), zap.Int64("ID", tableID)) - table.cancel() - } else { - log.Warn("Ignore existing table", util.ZapFieldChangefeed(ctx), zap.Int64("ID", tableID)) - return - } + if _, ok := p.tables[tableID]; ok { + log.Warn("Ignore existing table", util.ZapFieldChangefeed(ctx), zap.Int64("ID", tableID)) + return } globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) @@ -984,7 +780,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo zap.Uint64("startTs", replicaInfo.StartTs)) } - globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs) + globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs) log.Debug("Add table", zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx), zap.String("name", tableName), @@ -997,13 +793,12 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo id: tableID, name: tableName, resolvedTs: replicaInfo.StartTs, - cancel: cancel, } // TODO(leoppro) calculate the workload of this table // We temporarily set the value to constant 1 table.workload = model.WorkloadInfo{Workload: 1} - startPuller := func(tableID model.TableID, pResolvedTs *uint64) *puller.Rectifier { + startPuller := func(tableID model.TableID, pResolvedTs *uint64, pCheckpointTs *uint64) sink.Sink { // start table puller enableOldValue := p.changefeed.Config.EnableOldValue span := regionspan.GetTableSpan(tableID, enableOldValue) @@ -1020,10 +815,10 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo } }() - var sorterImpl puller.EventSorter + var sorter puller.EventSorter switch p.changefeed.Engine { case model.SortInMemory: - sorterImpl = puller.NewEntrySorter() + sorter = puller.NewEntrySorter() case model.SortInFile, model.SortUnified: err := util.IsDirAndWritable(p.changefeed.SortDir) if err != nil { @@ -1040,17 +835,15 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo } if p.changefeed.Engine == model.SortInFile { - sorterImpl = puller.NewFileSorter(p.changefeed.SortDir) + sorter = puller.NewFileSorter(p.changefeed.SortDir) } else { // Unified Sorter - sorterImpl = psorter.NewUnifiedSorter(p.changefeed.SortDir, tableName, util.CaptureAddrFromCtx(ctx)) + sorter = psorter.NewUnifiedSorter(p.changefeed.SortDir, tableName, util.CaptureAddrFromCtx(ctx)) } default: p.errCh <- cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine) return nil } - sorter := puller.NewRectifier(sorterImpl, p.changefeed.GetTargetTs()) - go func() { err := sorter.Run(ctx) if errors.Cause(err) != context.Canceled { @@ -1062,13 +855,13 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo p.pullerConsume(ctx, plr, sorter) }() + tableSink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs) go func() { - p.sorterConsume(ctx, tableID, tableName, sorter, pResolvedTs, replicaInfo) + p.sorterConsume(ctx, tableID, tableName, sorter, pResolvedTs, pCheckpointTs, replicaInfo, tableSink) }() - - return sorter + return tableSink } - + var tableSink, mTableSink sink.Sink if p.changefeed.Config.Cyclic.IsEnabled() && replicaInfo.MarkTableID != 0 { mTableID := replicaInfo.MarkTableID // we should to make sure a mark table is only listened once. @@ -1077,7 +870,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo table.markTableID = mTableID table.mResolvedTs = replicaInfo.StartTs - startPuller(mTableID, &table.mResolvedTs) + mTableSink = startPuller(mTableID, &table.mResolvedTs, &table.mCheckpointTs) } } @@ -1090,8 +883,14 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo } atomic.StoreUint64(&p.localResolvedTs, p.position.ResolvedTs) - table.sorter = startPuller(tableID, &table.resolvedTs) - + tableSink = startPuller(tableID, &table.resolvedTs, &table.checkpointTs) + table.cancel = func() { + cancel() + tableSink.Close() + if mTableSink != nil { + mTableSink.Close() + } + } syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc() } @@ -1101,9 +900,11 @@ func (p *processor) sorterConsume( ctx context.Context, tableID int64, tableName string, - sorter *puller.Rectifier, + sorter puller.EventSorter, pResolvedTs *uint64, + pCheckpointTs *uint64, replicaInfo *model.TableReplicaInfo, + sink sink.Sink, ) { var lastResolvedTs uint64 opDone := false @@ -1138,6 +939,55 @@ func (p *processor) sorterConsume( } } + events := make([]*model.PolymorphicEvent, 0, defaultSyncResolvedBatch) + rows := make([]*model.RowChangedEvent, 0, defaultSyncResolvedBatch) + + flushRowChangedEvents := func() error { + for _, ev := range events { + err := ev.WaitPrepare(ctx) + if err != nil { + return errors.Trace(err) + } + if ev.Row == nil { + continue + } + rows = append(rows, ev.Row) + } + failpoint.Inject("ProcessorSyncResolvedPreEmit", func() { + log.Info("Prepare to panic for ProcessorSyncResolvedPreEmit") + time.Sleep(10 * time.Second) + panic("ProcessorSyncResolvedPreEmit") + }) + err := sink.EmitRowChangedEvents(ctx, rows...) + if err != nil { + return errors.Trace(err) + } + events = events[:0] + rows = rows[:0] + return nil + } + + processRowChangedEvent := func(row *model.PolymorphicEvent) error { + events = append(events, row) + + if len(events) >= defaultSyncResolvedBatch { + err := flushRowChangedEvents() + if err != nil { + return errors.Trace(err) + } + } + return nil + } + + globalResolvedTsReceiver, err := p.globalResolvedTsNotifier.NewReceiver(1 * time.Second) + if err != nil { + if errors.Cause(err) != context.Canceled { + p.errCh <- errors.Trace(err) + } + return + } + defer globalResolvedTsReceiver.Stop() + for { select { case <-ctx.Done(): @@ -1161,6 +1011,16 @@ func (p *processor) sorterConsume( } if pEvent.RawKV != nil && pEvent.RawKV.OpType == model.OpTypeResolved { + if pEvent.CRTs == 0 { + continue + } + err := flushRowChangedEvents() + if err != nil { + if errors.Cause(err) != context.Canceled { + p.errCh <- errors.Trace(err) + } + return + } atomic.StoreUint64(pResolvedTs, pEvent.CRTs) lastResolvedTs = pEvent.CRTs p.localResolvedNotifier.Notify() @@ -1170,24 +1030,51 @@ func (p *processor) sorterConsume( } continue } - sinkResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs) if pEvent.CRTs <= lastResolvedTs || pEvent.CRTs < replicaInfo.StartTs { log.Panic("The CRTs of event is not expected, please report a bug", util.ZapFieldChangefeed(ctx), zap.String("model", "sorter"), - zap.Uint64("globalResolvedTs", sinkResolvedTs), zap.Uint64("resolvedTs", lastResolvedTs), zap.Int64("tableID", tableID), zap.Any("replicaInfo", replicaInfo), zap.Any("row", pEvent)) } - select { - case <-ctx.Done(): - if errors.Cause(ctx.Err()) != context.Canceled { - p.errCh <- ctx.Err() + failpoint.Inject("ProcessorSyncResolvedError", func() { + p.errCh <- errors.New("processor sync resolved injected error") + failpoint.Return() + }) + err := processRowChangedEvent(pEvent) + if err != nil { + if errors.Cause(err) != context.Canceled { + p.errCh <- errors.Trace(err) } return - case p.output <- pEvent: + } + case <-globalResolvedTsReceiver.C: + localResolvedTs := atomic.LoadUint64(&p.localResolvedTs) + globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs) + var minTs uint64 + if localResolvedTs < globalResolvedTs { + minTs = localResolvedTs + log.Warn("the local resolved ts is less than the global resolved ts", + zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) + } else { + minTs = globalResolvedTs + } + if minTs == 0 || atomic.LoadUint64(&p.checkpointTs) == minTs { + continue + } + + checkpointTs, err := sink.FlushRowChangedEvents(ctx, minTs) + if err != nil { + if errors.Cause(err) != context.Canceled { + p.errCh <- errors.Trace(err) + } + return + } + if checkpointTs != 0 { + atomic.StoreUint64(pCheckpointTs, checkpointTs) + p.localCheckpointTsNotifier.Notify() } case <-checkDoneTicker.C: if !opDone { @@ -1202,7 +1089,7 @@ func (p *processor) sorterConsume( func (p *processor) pullerConsume( ctx context.Context, plr puller.Puller, - sorter *puller.Rectifier, + sorter puller.EventSorter, ) { for { select { @@ -1230,6 +1117,9 @@ func (p *processor) stop(ctx context.Context) error { p.ddlPullerCancel() // mark tables share the same context with its original table, don't need to cancel p.stateMu.Unlock() + p.globalResolvedTsNotifier.Close() + p.localCheckpointTsNotifier.Close() + p.localResolvedNotifier.Close() failpoint.Inject("processorStopDelay", nil) atomic.StoreInt32(&p.stopped, 1) if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureInfo.ID); err != nil { @@ -1241,7 +1131,7 @@ func (p *processor) stop(ctx context.Context) error { if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID); err != nil { return err } - return p.sink.Close() + return p.sinkManager.Close() } func (p *processor) isStopped() bool { @@ -1274,13 +1164,14 @@ func runProcessor( return nil, errors.Trace(err) } ctx, cancel := context.WithCancel(ctx) - errCh := make(chan error, 1) - sink, err := sink.NewSink(ctx, changefeedID, info.SinkURI, filter, info.Config, opts, errCh) + errCh := make(chan error, 16) + s, err := sink.NewSink(ctx, changefeedID, info.SinkURI, filter, info.Config, opts, errCh) if err != nil { cancel() return nil, errors.Trace(err) } - processor, err := newProcessor(ctx, pdCli, credential, session, info, sink, + sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs) + processor, err := newProcessor(ctx, pdCli, credential, session, info, sinkManager, changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval) if err != nil { cancel() @@ -1292,15 +1183,33 @@ func runProcessor( processor.Run(ctx) go func() { + var errs []error + appendError := func(err error) { + log.Debug("processor received error", zap.Error(err)) + cause := errors.Cause(err) + if cause != nil && cause != context.Canceled && cerror.ErrAdminStopProcessor.NotEqual(cause) { + errs = append(errs, err) + } + } err := <-errCh - cause := errors.Cause(err) - if cause != nil && cause != context.Canceled && cerror.ErrAdminStopProcessor.NotEqual(cause) { - processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Inc() + appendError(err) + // sleep 500ms to wait all the errors are sent to errCh + time.Sleep(500 * time.Millisecond) + ReceiveErr: + for { + select { + case err := <-errCh: + appendError(err) + default: + break ReceiveErr + } + } + if len(errs) > 0 { log.Error("error on running processor", util.ZapFieldCapture(ctx), zap.String("changefeed", changefeedID), zap.String("processor", processor.id), - zap.Error(err)) + zap.Errors("errors", errs)) // record error information in etcd var code string if terror, ok := err.(*errors.Error); ok { diff --git a/cdc/puller/rectifier.go b/cdc/puller/rectifier.go deleted file mode 100644 index efbfc849492..00000000000 --- a/cdc/puller/rectifier.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "sync/atomic" - - "go.uber.org/zap" - - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - "golang.org/x/sync/errgroup" -) - -// Rectifier filters and collates the output stream from the sorter -type Rectifier struct { - EventSorter - - status model.SorterStatus - maxSentResolvedTs model.Ts - targetTs model.Ts - - outputCh chan *model.PolymorphicEvent -} - -// NewRectifier creates a new Rectifier -func NewRectifier(s EventSorter, targetTs model.Ts) *Rectifier { - return &Rectifier{ - EventSorter: s, - targetTs: targetTs, - outputCh: make(chan *model.PolymorphicEvent), - } -} - -// GetStatus returns the state of the Rectifier -func (r *Rectifier) GetStatus() model.SorterStatus { - return atomic.LoadInt32(&r.status) -} - -// GetMaxResolvedTs returns the maximum resolved ts sent from the Rectifier -func (r *Rectifier) GetMaxResolvedTs() model.Ts { - return atomic.LoadUint64(&r.maxSentResolvedTs) -} - -// SafeStop stops the Rectifier and Sorter safety -func (r *Rectifier) SafeStop() { - atomic.CompareAndSwapInt32(&r.status, - model.SorterStatusWorking, - model.SorterStatusStopping) -} - -// Run running the Rectifier -func (r *Rectifier) Run(ctx context.Context) error { - output := func(event *model.PolymorphicEvent) { - select { - case <-ctx.Done(): - log.Warn("failed to send to output channel", zap.Error(ctx.Err())) - case r.outputCh <- event: - } - } - errg, ctx := errgroup.WithContext(ctx) - errg.Go(func() error { - return r.EventSorter.Run(ctx) - }) - errg.Go(func() error { - defer close(r.outputCh) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case event := <-r.EventSorter.Output(): - if event == nil { - return nil - } - if event.CRTs > r.targetTs { - output(model.NewResolvedPolymorphicEvent(event.RegionID(), r.targetTs)) - atomic.StoreUint64(&r.maxSentResolvedTs, r.targetTs) - atomic.StoreInt32(&r.status, model.SorterStatusFinished) - return nil - } - output(event) - if event.RawKV.OpType == model.OpTypeResolved { - atomic.StoreUint64(&r.maxSentResolvedTs, event.CRTs) - switch atomic.LoadInt32(&r.status) { - case model.SorterStatusStopping: - atomic.StoreInt32(&r.status, model.SorterStatusStopped) - return nil - case model.SorterStatusStopped: - return nil - case model.SorterStatusWorking: - } - } - } - } - }) - return errg.Wait() -} - -// Output returns the output streams -func (r *Rectifier) Output() <-chan *model.PolymorphicEvent { - return r.outputCh -} diff --git a/cdc/puller/rectifier_test.go b/cdc/puller/rectifier_test.go deleted file mode 100644 index 91f41e7a161..00000000000 --- a/cdc/puller/rectifier_test.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "math" - "sync/atomic" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/pkg/util/testleak" - "golang.org/x/sync/errgroup" -) - -type rectifierSuite struct{} - -var _ = check.Suite(&rectifierSuite{}) - -type mockSorter struct { - outputCh chan *model.PolymorphicEvent -} - -func newMockSorter() *mockSorter { - return &mockSorter{ - outputCh: make(chan *model.PolymorphicEvent, 16), - } -} - -func (m *mockSorter) Run(ctx context.Context) error { - <-ctx.Done() - return nil -} - -func (m *mockSorter) AddEntry(ctx context.Context, entry *model.PolymorphicEvent) { - select { - case <-ctx.Done(): - return - case m.outputCh <- entry: - } -} - -func (m *mockSorter) Output() <-chan *model.PolymorphicEvent { - return m.outputCh -} - -func waitEntriesReceived(ctx context.Context, currentNum *int32, expectedNum int32) { - for { - select { - case <-ctx.Done(): - return - default: - num := atomic.LoadInt32(currentNum) - if num >= expectedNum { - return - } - } - time.Sleep(50 * time.Millisecond) - } -} - -func (s *rectifierSuite) TestRectifierSafeStop(c *check.C) { - defer testleak.AfterTest(c)() - mockSorter := newMockSorter() - r := NewRectifier(mockSorter, math.MaxUint64) - ctx, cancel := context.WithCancel(context.Background()) - errg, ctx := errgroup.WithContext(ctx) - errg.Go(func() error { - return r.Run(ctx) - }) - entriesReceivedNum := int32(0) - errg.Go(func() error { - expected := []*model.PolymorphicEvent{ - {CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 3, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 4, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 5, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 6, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 8, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 9, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - } - i := int32(0) - for e := range r.Output() { - c.Assert(e, check.DeepEquals, expected[i]) - i++ - atomic.StoreInt32(&entriesReceivedNum, i) - } - - return nil - }) - entries := []*model.PolymorphicEvent{ - {CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 3, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 4, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 5, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 6, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - } - for _, e := range entries { - mockSorter.AddEntry(ctx, e) - } - waitEntriesReceived(ctx, &entriesReceivedNum, 6) - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusWorking) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(5)) - r.SafeStop() - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusStopping) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(5)) - - entries = []*model.PolymorphicEvent{ - {CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 8, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 9, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 10, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 11, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - } - for _, e := range entries { - mockSorter.AddEntry(ctx, e) - } - waitEntriesReceived(ctx, &entriesReceivedNum, 9) - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusStopped) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(9)) - r.SafeStop() - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusStopped) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(9)) - cancel() - c.Assert(errg.Wait(), check.IsNil) -} - -func (s *rectifierSuite) TestRectifierFinished(c *check.C) { - defer testleak.AfterTest(c)() - mockSorter := newMockSorter() - r := NewRectifier(mockSorter, 7) - ctx, cancel := context.WithCancel(context.Background()) - errg, ctx := errgroup.WithContext(ctx) - errg.Go(func() error { - return r.Run(ctx) - }) - entriesReceivedNum := int32(0) - errg.Go(func() error { - expected := []*model.PolymorphicEvent{ - {CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 3, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 4, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 5, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 6, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved, CRTs: 7}}, - } - i := int32(0) - for e := range r.Output() { - c.Assert(e, check.DeepEquals, expected[i]) - i++ - atomic.StoreInt32(&entriesReceivedNum, i) - } - - return nil - }) - entries := []*model.PolymorphicEvent{ - {CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 3, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 4, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 5, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 6, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 8, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 9, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - {CRTs: 10, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}, - {CRTs: 11, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}, - } - for _, e := range entries { - mockSorter.AddEntry(ctx, e) - } - waitEntriesReceived(ctx, &entriesReceivedNum, 8) - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusFinished) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(7)) - r.SafeStop() - c.Assert(r.GetStatus(), check.Equals, model.SorterStatusFinished) - c.Assert(r.GetMaxResolvedTs(), check.Equals, model.Ts(7)) - - cancel() - c.Assert(errg.Wait(), check.IsNil) -} diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go new file mode 100644 index 00000000000..ccca7fc65b0 --- /dev/null +++ b/cdc/sink/manager.go @@ -0,0 +1,268 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "math" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/ticdc/pkg/util" + + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" +) + +const ( + // TODO: buffer chan size, the accumulated data is determined by + // the count of sorted data and unmounted data. In current benchmark a single + // processor can reach 50k-100k QPS, and accumulated data is around + // 200k-400k in most cases. We need a better chan cache mechanism. + defaultBufferChanSize = 1280000 + defaultMetricInterval = time.Second * 15 +) + +// Manager manages table sinks, maintains the relationship between table sinks and backendSink +type Manager struct { + backendSink Sink + checkpointTs model.Ts + tableSinks map[model.TableID]*tableSink + tableSinksMu sync.Mutex +} + +// NewManager creates a new Sink manager +func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) *Manager { + return &Manager{ + backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs), + checkpointTs: checkpointTs, + tableSinks: make(map[model.TableID]*tableSink), + } +} + +// CreateTableSink creates a table sink +func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) Sink { + if _, exist := m.tableSinks[tableID]; exist { + log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID))) + } + sink := &tableSink{ + tableID: tableID, + manager: m, + buffer: make([]*model.RowChangedEvent, 0, 128), + emittedTs: checkpointTs, + } + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + m.tableSinks[tableID] = sink + return sink +} + +// Close closes the Sink manager and backend Sink +func (m *Manager) Close() error { + return m.backendSink.Close() +} + +func (m *Manager) getMinEmittedTs() model.Ts { + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + if len(m.tableSinks) == 0 { + return m.getCheckpointTs() + } + minTs := model.Ts(math.MaxUint64) + for _, tableSink := range m.tableSinks { + emittedTs := tableSink.getEmittedTs() + if minTs > emittedTs { + minTs = emittedTs + } + } + return minTs +} + +func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { + minEmittedTs := m.getMinEmittedTs() + checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs) + if err != nil { + return m.getCheckpointTs(), errors.Trace(err) + } + atomic.StoreUint64(&m.checkpointTs, checkpointTs) + return checkpointTs, nil +} + +func (m *Manager) destroyTableSink(tableID model.TableID) { + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + delete(m.tableSinks, tableID) +} + +func (m *Manager) getCheckpointTs() uint64 { + return atomic.LoadUint64(&m.checkpointTs) +} + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + // emittedTs means all of events which of commitTs less than or equal to emittedTs is sent to backendSink + emittedTs model.Ts +} + +func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { + // do nothing + return nil +} + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + atomic.StoreUint64(&t.emittedTs, resolvedTs) + return t.manager.flushBackendSink(ctx) + } + resolvedRows := t.buffer[:i] + t.buffer = t.buffer[i:] + + err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return t.manager.getCheckpointTs(), errors.Trace(err) + } + atomic.StoreUint64(&t.emittedTs, resolvedTs) + return t.manager.flushBackendSink(ctx) +} + +func (t *tableSink) getEmittedTs() uint64 { + return atomic.LoadUint64(&t.emittedTs) +} + +func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +func (t *tableSink) Close() error { + t.manager.destroyTableSink(t.tableID) + return nil +} + +type bufferSink struct { + Sink + buffer chan struct { + rows []*model.RowChangedEvent + resolvedTs model.Ts + } + checkpointTs uint64 +} + +func newBufferSink(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) Sink { + sink := &bufferSink{ + Sink: backendSink, + buffer: make(chan struct { + rows []*model.RowChangedEvent + resolvedTs model.Ts + }, defaultBufferChanSize), + checkpointTs: checkpointTs, + } + go sink.run(ctx, errCh) + return sink +} + +func (b *bufferSink) run(ctx context.Context, errCh chan error) { + changefeedID := util.ChangefeedIDFromCtx(ctx) + advertiseAddr := util.CaptureAddrFromCtx(ctx) + metricFlushDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "Flush") + metricEmitRowDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "EmitRow") + metricBufferSize := bufferChanSizeGauge.WithLabelValues(advertiseAddr, changefeedID) + for { + select { + case <-ctx.Done(): + err := ctx.Err() + if err != nil && errors.Cause(err) != context.Canceled { + errCh <- err + } + return + case e := <-b.buffer: + if e.rows == nil { + // A resolved event received + start := time.Now() + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, e.resolvedTs) + if err != nil { + if errors.Cause(err) != context.Canceled { + errCh <- err + } + return + } + atomic.StoreUint64(&b.checkpointTs, checkpointTs) + + dur := time.Since(start) + metricFlushDuration.Observe(dur.Seconds()) + if dur > 3*time.Second { + log.Warn("flush row changed events too slow", + zap.Duration("duration", dur), util.ZapFieldChangefeed(ctx)) + } + continue + } + start := time.Now() + err := b.Sink.EmitRowChangedEvents(ctx, e.rows...) + if err != nil { + if errors.Cause(err) != context.Canceled { + errCh <- err + } + return + } + dur := time.Since(start) + metricEmitRowDuration.Observe(dur.Seconds()) + case <-time.After(defaultMetricInterval): + metricBufferSize.Set(float64(len(b.buffer))) + } + } +} + +func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + case b.buffer <- struct { + rows []*model.RowChangedEvent + resolvedTs model.Ts + }{rows: rows}: + } + return nil +} + +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { + select { + case <-ctx.Done(): + return atomic.LoadUint64(&b.checkpointTs), ctx.Err() + case b.buffer <- struct { + rows []*model.RowChangedEvent + resolvedTs model.Ts + }{resolvedTs: resolvedTs, rows: nil}: + } + return atomic.LoadUint64(&b.checkpointTs), nil +} diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go new file mode 100644 index 00000000000..c307ac7c93f --- /dev/null +++ b/cdc/sink/manager_test.go @@ -0,0 +1,263 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "math" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + + "github.com/pingcap/check" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/util/testleak" + "go.uber.org/zap" +) + +type managerSuite struct{} + +var _ = check.Suite(&managerSuite{}) + +type checkSink struct { + *check.C + rows []*model.RowChangedEvent + rowsMu sync.Mutex + lastResolvedTs uint64 +} + +func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { + panic("unreachable") +} + +func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + c.rowsMu.Lock() + defer c.rowsMu.Unlock() + for _, row := range rows { + log.Info("rows in check sink", zap.Reflect("row", row)) + } + c.rows = append(c.rows, rows...) + return nil +} + +func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + panic("unreachable") +} + +func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { + c.rowsMu.Lock() + defer c.rowsMu.Unlock() + log.Info("flush in check sink", zap.Uint64("resolved", resolvedTs)) + var newRows []*model.RowChangedEvent + for _, row := range c.rows { + c.Assert(row.CommitTs, check.Greater, c.lastResolvedTs) + if row.CommitTs > resolvedTs { + newRows = append(newRows, row) + } + } + + c.Assert(c.lastResolvedTs, check.LessEqual, resolvedTs) + c.lastResolvedTs = resolvedTs + c.rows = newRows + + return c.lastResolvedTs, nil +} + +func (c *checkSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + panic("unreachable") +} + +func (c *checkSink) Close() error { + return nil +} + +func (s *managerSuite) TestManagerRandom(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error, 16) + manager := NewManager(ctx, &checkSink{C: c}, errCh, 0) + defer manager.Close() + goroutineNum := 10 + rowNum := 100 + var wg sync.WaitGroup + tableSinks := make([]Sink, goroutineNum) + for i := 0; i < goroutineNum; i++ { + tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0) + } + for i := 0; i < goroutineNum; i++ { + i := i + tableSink := tableSinks[i] + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + var lastResolvedTs uint64 + for j := 1; j < rowNum; j++ { + if rand.Intn(10) == 0 { + resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs))) + _, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs) + c.Assert(err, check.IsNil) + lastResolvedTs = resolvedTs + } else { + err := tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ + Table: &model.TableName{TableID: int64(i)}, + CommitTs: uint64(j), + }) + c.Assert(err, check.IsNil) + } + } + _, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum)) + c.Assert(err, check.IsNil) + }() + } + wg.Wait() + cancel() + time.Sleep(1 * time.Second) + close(errCh) + for err := range errCh { + c.Assert(err, check.IsNil) + } +} + +func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error, 16) + manager := NewManager(ctx, &checkSink{C: c}, errCh, 0) + defer manager.Close() + goroutineNum := 10 + var wg sync.WaitGroup + const ExitSignal = uint64(math.MaxUint64) + + var maxResolvedTs uint64 + tableSinks := make([]Sink, 0, goroutineNum) + closeChs := make([]chan struct{}, 0, goroutineNum) + runTableSink := func(index int64, sink Sink, startTs uint64, close chan struct{}) { + defer wg.Done() + ctx := context.Background() + lastResolvedTs := startTs + for { + select { + case <-close: + return + default: + } + resolvedTs := atomic.LoadUint64(&maxResolvedTs) + if resolvedTs == ExitSignal { + return + } + if resolvedTs == lastResolvedTs { + time.Sleep(10 * time.Millisecond) + continue + } + for i := lastResolvedTs + 1; i <= resolvedTs; i++ { + err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ + Table: &model.TableName{TableID: index}, + CommitTs: i, + }) + c.Assert(err, check.IsNil) + } + _, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + c.Assert(err, check.IsNil) + lastResolvedTs = resolvedTs + } + } + + wg.Add(1) + go func() { + defer wg.Done() + // add three table and then remote one table + for i := 0; i < 10; i++ { + if i%4 != 3 { + // add table + table := manager.CreateTableSink(model.TableID(i), maxResolvedTs+1) + close := make(chan struct{}) + tableSinks = append(tableSinks, table) + closeChs = append(closeChs, close) + atomic.AddUint64(&maxResolvedTs, 20) + wg.Add(1) + go runTableSink(int64(i), table, maxResolvedTs, close) + } else { + // remove table + table := tableSinks[0] + close(closeChs[0]) + c.Assert(table.Close(), check.IsNil) + tableSinks = tableSinks[1:] + closeChs = closeChs[1:] + } + time.Sleep(100 * time.Millisecond) + } + atomic.StoreUint64(&maxResolvedTs, ExitSignal) + }() + + wg.Wait() + cancel() + time.Sleep(1 * time.Second) + close(errCh) + for err := range errCh { + c.Assert(err, check.IsNil) + } +} + +type errorSink struct { + *check.C +} + +func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { + panic("unreachable") +} + +func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + return errors.New("error in emit row changed events") +} + +func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + panic("unreachable") +} + +func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { + return 0, errors.New("error in flush row changed events") +} + +func (e *errorSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + panic("unreachable") +} + +func (e *errorSink) Close() error { + return nil +} + +func (s *managerSuite) TestManagerError(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errCh := make(chan error, 16) + manager := NewManager(ctx, &errorSink{C: c}, errCh, 0) + defer manager.Close() + sink := manager.CreateTableSink(1, 0) + err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ + CommitTs: 1, + }) + c.Assert(err, check.IsNil) + _, err = sink.FlushRowChangedEvents(ctx, 2) + c.Assert(err, check.IsNil) + err = <-errCh + c.Assert(err.Error(), check.Equals, "error in emit row changed events") +} diff --git a/cdc/sink/metrics.go b/cdc/sink/metrics.go index 7aa91469e10..77e5f0a21fa 100644 --- a/cdc/sink/metrics.go +++ b/cdc/sink/metrics.go @@ -70,6 +70,21 @@ var ( Name: "total_flushed_rows_count", Help: "totla count of flushed rows", }, []string{"capture", "changefeed"}) + flushRowChangedDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "flush_event_duration_seconds", + Help: "Bucketed histogram of processing time (s) of flushing events in processor", + Buckets: prometheus.ExponentialBuckets(0.002 /* 2ms */, 2, 20), + }, []string{"capture", "changefeed", "type"}) + bufferChanSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "buffer_chan_size", + Help: "size of row changed event buffer channel in sink manager", + }, []string{"capture", "changefeed"}) ) // InitMetrics registers all metrics in this file @@ -81,4 +96,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(bucketSizeCounter) registry.MustRegister(totalRowsCountGauge) registry.MustRegister(totalFlushedRowsCountGauge) + registry.MustRegister(flushRowChangedDuration) + registry.MustRegister(bufferChanSizeGauge) } diff --git a/docs/data-flow.dot b/docs/data-flow.dot index f2afa7bcc22..c7f94e07815 100644 --- a/docs/data-flow.dot +++ b/docs/data-flow.dot @@ -104,10 +104,9 @@ digraph Dataflow { processor_sorter_consume [label = "sorterConsume"] processor_puller_consume [label = "pullerConsume"] - processor_sync_resolved [label = "syncResolved"] - processor_sorter_consume -> processor_sync_resolved [label = "P.output"] } + processor_sorter_consume -> table_sink_run [label = "P.output"] subgraph cluster_puller { label = "Puller"; @@ -132,18 +131,11 @@ digraph Dataflow { sorter_run [label = "Run"] } - sorter_run -> rectifier_run [label = "E.Output"] - - subgraph cluster_rectifier { - label = "Rectifier"; - - rectifier_run [label = "Run"] - } } - rectifier_run -> processor_sorter_consume [label = "R.Ouput"] + sorter_run -> processor_sorter_consume [label = "S.Ouput"] processor_sorter_consume -> mounter_run [label = "M.Input"] - mounter_run -> processor_sync_resolved [ + mounter_run -> table_sink_run [ label = "wait unmarshal", style = "dashed", dir = back, @@ -156,13 +148,30 @@ digraph Dataflow { mounter_run [label = "Run"] } - processor_sync_resolved -> sink_run [label = "S.EmitEvents"] - subgraph cluster_sink { - label = "Sink"; + label = "Sink Manager"; style = filled; - sink_run [label = "FlushEvents"] + subgraph table_sink { + label = "Table Sink (N)"; + + table_sink_run [label = "Table Sink (N)\nFlushEvents"] + } + + subgraph buffer_sink { + label = "Buffer Sink (1)"; + + buffer_sink_run [label = "Buffer Sink (1)\nFlushEvents"] + } + + subgraph backend_sink { + label = "Backend Sink (1)"; + + backend_sink_run [label = "Backend Sink (1)\nFlushEvents"] + } + + table_sink_run -> buffer_sink_run [label = "S.EmitEvents"] + buffer_sink_run -> backend_sink_run [label = "S.EmitEvents"] } } @@ -170,5 +179,5 @@ digraph Dataflow { data_in -> Raftstore data_out [label = "Data out", shape = oval] - sink_run -> data_out + backend_sink_run -> data_out } diff --git a/docs/data-flow.svg b/docs/data-flow.svg index 5cb424b7bbd..c4684d16ff0 100644 --- a/docs/data-flow.svg +++ b/docs/data-flow.svg @@ -1,246 +1,289 @@ - - - + + Dataflow - -cluster_legends - -Legend - -cluster_tikv - -TiKV - -cluster_ticdc - -TiCDC - -cluster_kvclient - -KV client - -cluster_processor - -Processor - -cluster_puller - -Puller - -cluster_sorter - -Sorter - -cluster_entry_sorter - -Entry sorter - -cluster_rectifier - -Rectifier - -cluster_mounter - -Mounter - -cluster_sink - -Sink + + +cluster_legends + +Legend + + +cluster_tikv + +TiKV + + +cluster_ticdc + +TiCDC + + +cluster_kvclient + +KV client + + +cluster_processor + +Processor + + +cluster_puller + +Puller + + +cluster_sorter + +Sorter + + +cluster_entry_sorter + +Entry sorter + + +cluster_mounter + +Mounter + + +cluster_sink + +Sink Manager -flow - -Go routine #1 + +flow + +Go routine #1 -flow_ - -Go routine #2 + +flow_ + +Go routine #2 -flow->flow_ - - -Flow -direction -Channel -/Buffer + +flow->flow_ + + +Flow +direction +Channel +/Buffer -wait - -Go routine #1 + +wait + +Go routine #1 -wait_ - -Go routine #2 + +wait_ + +Go routine #2 -wait->wait_ - - -#1 wiats #2 + +wait->wait_ + + +#1 wiats #2 -Raftstore - -Raftstore + +Raftstore + +Raftstore -CDC - -CDC + +CDC + +CDC -Raftstore->CDC - - -channel + +Raftstore->CDC + + +channel -kv_client - -kv_client + +kv_client + +kv_client -CDC->kv_client - - -gRPC + +CDC->kv_client + + +gRPC -puller_run_step1 - -Run #1 -(Add to memory buffer) + +puller_run_step1 + +Run #1 +(Add to memory buffer) -kv_client->puller_run_step1 - - -eventCh + +kv_client->puller_run_step1 + + +eventCh -puller_run_step2 - -Run #2 -(Output to output channel) + +puller_run_step2 + +Run #2 +(Output to output channel) -puller_run_step1->puller_run_step2 - - -P.buffer + +puller_run_step1->puller_run_step2 + + +P.buffer -processor_sorter_consume - -sorterConsume - - -processor_sync_resolved - -syncResolved - - -processor_sorter_consume->processor_sync_resolved - - -P.output + +processor_sorter_consume + +sorterConsume + + + +table_sink_run + +Table Sink (N) +FlushEvents + + + +processor_sorter_consume->table_sink_run + + +P.output -mounter_run - -Run + +mounter_run + +Run -processor_sorter_consume->mounter_run - - -M.Input + +processor_sorter_consume->mounter_run + + +M.Input -processor_puller_consume - -pullerConsume + +processor_puller_consume + +pullerConsume -sorter_run - -Run + +sorter_run + +Run -processor_puller_consume->sorter_run - - -E.AddEntry - - -sink_run - -FlushEvents - - -processor_sync_resolved->sink_run - - -S.EmitEvents + +processor_puller_consume->sorter_run + + +E.AddEntry + + + +buffer_sink_run + +Buffer Sink (1) +FlushEvents + + + +table_sink_run->buffer_sink_run + + +S.EmitEvents -puller_run_step2->processor_puller_consume - - -P.Output - - -rectifier_run - -Run - - -sorter_run->rectifier_run - - -E.Output - - -rectifier_run->processor_sorter_consume - - -R.Ouput - - -mounter_run->processor_sync_resolved - - -wait unmarshal + +puller_run_step2->processor_puller_consume + + +P.Output + + + +sorter_run->processor_sorter_consume + + +S.Ouput + + + +mounter_run->table_sink_run + + +wait unmarshal + + + +backend_sink_run + +Backend Sink (1) +FlushEvents + + + +buffer_sink_run->backend_sink_run + + +S.EmitEvents -data_out - -Data out + +data_out + +Data out - -sink_run->data_out - - + + +backend_sink_run->data_out + + -data_in - -Data in + +data_in + +Data in -data_in->Raftstore - - + +data_in->Raftstore + + diff --git a/docs/ticdc-grafana-dashboard.md b/docs/ticdc-grafana-dashboard.md index d1e344d3de9..3a446c085a3 100644 --- a/docs/ticdc-grafana-dashboard.md +++ b/docs/ticdc-grafana-dashboard.md @@ -94,7 +94,7 @@ cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:1 - Sink flush rows/s : TiCDC 节点每秒写到下游的数据变更的个数 - Puller buffer size : TiCDC 节点中缓存在 Puller 模块中的数据变更个数 - Entry sorter buffer size : TiCDC 节点中缓存在 Sorter 模块中的数据变更个数 -- Processor/Mounter buffer size : TiCDC 节点中缓存在 Processor 模块和 Mounter 模块中的数据变更个数 +- Sink/Mounter buffer size : TiCDC 节点中缓存在 Buffer Sink 模块和 Mounter 模块中的数据变更个数 - Sink row buffer size : TiCDC 节点中缓存在 Sink 模块中的数据变更个数 - Entry sorter sort duration : TiCDC 节点排序数据变更的耗时直方图 - Entry sorter sort duration percentile : 每秒钟中 95%,99% 和 99.9% 的情况下,TiCDC 排序数据变更所花费的时间 diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index bb818f69250..98f3faded1a 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -109,7 +109,7 @@ "gnetId": null, "graphTooltip": 1, "id": null, - "iteration": 1608013159669, + "iteration": 1610607213210, "links": [], "panels": [ { @@ -2054,7 +2054,7 @@ "reverseYBuckets": false, "targets": [ { - "expr": "max(rate(ticdc_processor_flush_event_duration_seconds_bucket{capture=~\"$capture\"}[1m])) by (le)", + "expr": "max(rate(ticdc_sink_flush_event_duration_seconds_bucket{capture=~\"$capture\"}[1m])) by (le)", "format": "heatmap", "instant": false, "intervalFactor": 2, @@ -2133,24 +2133,24 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p95", + "legendFormat": "{{instance}}-{{type}}-p95", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p99", + "legendFormat": "{{instance}}-{{type}}-p99", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p999", + "legendFormat": "{{instance}}-{{type}}-p999", "refId": "C" } ], @@ -3522,10 +3522,10 @@ "refId": "A" }, { - "expr": "-sum(ticdc_processor_txn_output_chan_size{changefeed=~\"$changefeed\",capture=~\"$capture\"}) by (capture)", + "expr": "-sum(ticdc_sink_buffer_chan_size{changefeed=~\"$changefeed\",capture=~\"$capture\"}) by (capture)", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{capture}}-processor output chan", + "legendFormat": "{{capture}}-sink buffer chan", "refId": "B" } ], @@ -3533,7 +3533,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Processor/Mounter buffer size", + "title": "Sink/Mounter buffer size", "tooltip": { "shared": true, "sort": 0, @@ -4415,7 +4415,7 @@ "h": 8, "w": 12, "x": 0, - "y": 1 + "y": 4 }, "id": 131, "legend": { @@ -4499,7 +4499,7 @@ "h": 8, "w": 12, "x": 12, - "y": 1 + "y": 4 }, "id": 132, "legend": { @@ -4583,7 +4583,7 @@ "h": 8, "w": 12, "x": 0, - "y": 9 + "y": 12 }, "id": 133, "legend": { @@ -4667,7 +4667,7 @@ "h": 8, "w": 12, "x": 12, - "y": 9 + "y": 12 }, "id": 134, "legend": { @@ -4760,7 +4760,7 @@ "h": 8, "w": 12, "x": 0, - "y": 17 + "y": 20 }, "heatmap": {}, "hideZeroBuckets": true, @@ -4824,7 +4824,7 @@ "h": 8, "w": 12, "x": 12, - "y": 17 + "y": 20 }, "heatmap": {}, "hideZeroBuckets": true, @@ -4881,7 +4881,7 @@ "h": 8, "w": 12, "x": 12, - "y": 25 + "y": 28 }, "id": 137, "legend": { @@ -6052,5 +6052,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 12 + "version": 13 } diff --git a/tests/owner_remove_table_error/conf/diff_config.toml b/tests/owner_remove_table_error/conf/diff_config.toml new file mode 100644 index 00000000000..74ddef0c33e --- /dev/null +++ b/tests/owner_remove_table_error/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "owner_remove_table_error" + tables = ["~t.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/owner_remove_table_error/run.sh b/tests/owner_remove_table_error/run.sh new file mode 100644 index 00000000000..d78907c44e8 --- /dev/null +++ b/tests/owner_remove_table_error/run.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 +MAX_RETRIES=20 + + +function run() { + # kafka is not supported yet. + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1"; + + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/OwnerRemoveTableError=1*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr + changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}') + + run_sql "CREATE DATABASE owner_remove_table_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table owner_remove_table_error.t1(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO owner_remove_table_error.t1 VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "DROP table owner_remove_table_error.t1;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table owner_remove_table_error.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO owner_remove_table_error.t2 VALUES (),(),();" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE table owner_remove_table_error.finished_mark(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + + check_table_exists "owner_remove_table_error.finished_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"