From 979485490ed94952b1c761f00a3781e1b1ed4c1c Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 3 Feb 2023 22:23:56 +0800 Subject: [PATCH] sinkv2(cdc): fix panics about table scheduling or blackhole sink (#8156) close pingcap/tiflow#8024 --- cdc/processor/sinkmanager/redo_log_worker.go | 21 +++++++-------- .../sinkmanager/table_sink_worker.go | 20 +++++++------- .../engine/memory/event_sorter.go | 4 +-- .../engine/memory/event_sorter_test.go | 3 +-- cdc/processor/sourcemanager/engine/metrics.go | 5 ++++ .../engine/pebble/event_sorter.go | 26 +++++++++++++++---- .../engine/pebble/event_sorter_test.go | 3 +-- cdc/redo/manager.go | 7 +++-- 8 files changed, 52 insertions(+), 37 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 8bcca04d039..dd811fd404e 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -139,7 +139,17 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e } maybeEmitBatchEvents := func(allFinished, txnFinished bool) error { + // If used memory size exceeds the required limit, do a force acquire. memoryHighUsage := availableMemSize < usedMemSize + if memoryHighUsage { + w.memQuota.forceAcquire(usedMemSize - availableMemSize) + log.Debug("MemoryQuotaTracing: force acquire memory for redo log task", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Stringer("span", &task.span), + zap.Uint64("memory", usedMemSize-availableMemSize)) + availableMemSize = usedMemSize + } // Do emit in such situations: // 1. we use more memory than we required; @@ -151,17 +161,6 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e } } - // If used memory size exceeds the required limit, do a block require. - if memoryHighUsage { - w.memQuota.forceAcquire(usedMemSize - availableMemSize) - log.Debug("MemoryQuotaTracing: force acquire memory for redo log task", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Uint64("memory", usedMemSize-availableMemSize)) - availableMemSize = usedMemSize - } - if allFinished { return nil } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 4a974f8a89d..9e7409fb6a2 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -182,7 +182,17 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e } maybeEmitAndAdvance := func(allFinished, txnFinished bool) error { + // If used memory size exceeds the required limit, do a force acquire. memoryHighUsage := availableMem < usedMem + if memoryHighUsage { + w.sinkMemQuota.forceAcquire(usedMem - availableMem) + log.Debug("MemoryQuotaTracing: force acquire memory for table sink task", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Stringer("span", &task.span), + zap.Uint64("memory", usedMem-availableMem)) + availableMem = usedMem + } // Do emit in such situations: // 1. we use more memory than we required; @@ -194,16 +204,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e } } - if memoryHighUsage { - w.sinkMemQuota.forceAcquire(usedMem - availableMem) - log.Debug("MemoryQuotaTracing: force acquire memory for table sink task", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Uint64("memory", usedMem-availableMem)) - availableMem = usedMem - } - if allFinished { return nil } diff --git a/cdc/processor/sourcemanager/engine/memory/event_sorter.go b/cdc/processor/sourcemanager/engine/memory/event_sorter.go index d6a8f270348..cb88ea7754d 100644 --- a/cdc/processor/sourcemanager/engine/memory/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter.go @@ -107,9 +107,7 @@ func (s *EventSorter) OnResolve(action func(tablepb.Span, model.Ts)) { } // FetchByTable implements engine.SortEngine. -func (s *EventSorter) FetchByTable( - span tablepb.Span, lowerBound, upperBound engine.Position, -) engine.EventIterator { +func (s *EventSorter) FetchByTable(span tablepb.Span, lowerBound, upperBound engine.Position) engine.EventIterator { value, exists := s.tables.Load(span) if !exists { log.Panic("fetch events from an unexist table", zap.Stringer("span", &span)) diff --git a/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go index 2c59a4d84c0..b5f4caf1640 100644 --- a/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go @@ -100,8 +100,7 @@ func TestEventSorter(t *testing.T) { es.Add(span, model.NewPolymorphicEvent(entry)) } es.Add(span, model.NewResolvedPolymorphicEvent(0, tc.resolvedTs)) - iter := es.FetchByTable( - span, nextToFetch, engine.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs}) + iter := es.FetchByTable(span, nextToFetch, engine.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs}) for _, expect := range tc.expect { event, pos, _ := iter.Next() require.NotNil(t, event) diff --git a/cdc/processor/sourcemanager/engine/metrics.go b/cdc/processor/sourcemanager/engine/metrics.go index 28aa13e74a6..1abb29a9a8d 100644 --- a/cdc/processor/sourcemanager/engine/metrics.go +++ b/cdc/processor/sourcemanager/engine/metrics.go @@ -112,6 +112,11 @@ func SorterWriteBytes() *prometheus.HistogramVec { return sorterWriteBytesHistogram } +// SorterIterReadDuration returns sorterIterReadDurationHistogram. +func SorterIterReadDuration() *prometheus.HistogramVec { + return sorterIterReadDurationHistogram +} + // InMemoryDataSize returns inMemoryDataSizeGauge. func InMemoryDataSize() *prometheus.GaugeVec { return inMemoryDataSizeGauge diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index 004e576cfcf..885d5865e29 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/spanz" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -64,6 +65,8 @@ type EventIter struct { iter *pebble.Iterator headItem *model.PolymorphicEvent serde encoding.MsgPackGenSerde + + nextDuration prometheus.Observer } // New creates an EventSorter instance. @@ -205,10 +208,7 @@ func (s *EventSorter) OnResolve(action func(tablepb.Span, model.Ts)) { } // FetchByTable implements engine.SortEngine. -func (s *EventSorter) FetchByTable( - span tablepb.Span, - lowerBound, upperBound engine.Position, -) engine.EventIterator { +func (s *EventSorter) FetchByTable(span tablepb.Span, lowerBound, upperBound engine.Position) engine.EventIterator { s.mu.RLock() state, exists := s.tables.Get(span) s.mu.RUnlock() @@ -231,8 +231,21 @@ func (s *EventSorter) FetchByTable( } db := s.dbs[getDB(span, len(s.dbs))] + iterReadDur := engine.SorterIterReadDuration() + + seekStart := time.Now() iter := iterTable(db, state.uniqueID, span.TableID, lowerBound, upperBound) - return &EventIter{tableID: span.TableID, state: state, iter: iter, serde: s.serde} + iterReadDur.WithLabelValues(s.changefeedID.Namespace, s.changefeedID.ID, "first"). + Observe(time.Since(seekStart).Seconds()) + + return &EventIter{ + tableID: span.TableID, + state: state, + iter: iter, + serde: s.serde, + + nextDuration: iterReadDur.WithLabelValues(s.changefeedID.Namespace, s.changefeedID.ID, "next"), + } } // FetchAllTables implements engine.SortEngine. @@ -344,7 +357,10 @@ func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, valid := s.iter != nil && s.iter.Valid() var value []byte for valid { + nextStart := time.Now() value, valid = s.iter.Value(), s.iter.Next() + s.nextDuration.Observe(time.Since(nextStart).Seconds()) + event = &model.PolymorphicEvent{} if _, err = s.serde.Unmarshal(event, value); err != nil { return diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index cfda44b1c5a..6f3b7f14155 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -135,8 +135,7 @@ func TestEventFetch(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) select { case ts := <-resolvedTs: - iter := s.FetchByTable( - span, engine.Position{}, engine.Position{CommitTs: ts, StartTs: ts - 1}) + iter := s.FetchByTable(span, engine.Position{}, engine.Position{CommitTs: ts, StartTs: ts - 1}) for { event, pos, err := iter.Next() require.Nil(t, err) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index c98016a4b57..7678b1c1019 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -493,11 +493,10 @@ func (m *ManagerImpl) flushLog( } func (m *ManagerImpl) onResolvedTsMsg(span tablepb.Span, resolvedTs model.Ts) { - value, loaded := m.rtsMap.Load(span) - if !loaded { - panic("onResolvedTsMsg is called for an invalid table") + // It's possible that the table is removed while redo log is still in writing. + if value, loaded := m.rtsMap.Load(span); loaded { + value.(*statefulRts).checkAndSetUnflushed(resolvedTs) } - value.(*statefulRts).checkAndSetUnflushed(resolvedTs) } func (m *ManagerImpl) bgUpdateLog(