Skip to content

Commit

Permalink
sinkv2(cdc): fix panics about table scheduling or blackhole sink (#8156)
Browse files Browse the repository at this point in the history
close #8024
  • Loading branch information
hicqu authored Feb 3, 2023
1 parent 84a76a8 commit 9794854
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 37 deletions.
21 changes: 10 additions & 11 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,17 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
}

maybeEmitBatchEvents := func(allFinished, txnFinished bool) error {
// If used memory size exceeds the required limit, do a force acquire.
memoryHighUsage := availableMemSize < usedMemSize
if memoryHighUsage {
w.memQuota.forceAcquire(usedMemSize - availableMemSize)
log.Debug("MemoryQuotaTracing: force acquire memory for redo log task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Uint64("memory", usedMemSize-availableMemSize))
availableMemSize = usedMemSize
}

// Do emit in such situations:
// 1. we use more memory than we required;
Expand All @@ -151,17 +161,6 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
}
}

// If used memory size exceeds the required limit, do a block require.
if memoryHighUsage {
w.memQuota.forceAcquire(usedMemSize - availableMemSize)
log.Debug("MemoryQuotaTracing: force acquire memory for redo log task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Uint64("memory", usedMemSize-availableMemSize))
availableMemSize = usedMemSize
}

if allFinished {
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,17 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
}

maybeEmitAndAdvance := func(allFinished, txnFinished bool) error {
// If used memory size exceeds the required limit, do a force acquire.
memoryHighUsage := availableMem < usedMem
if memoryHighUsage {
w.sinkMemQuota.forceAcquire(usedMem - availableMem)
log.Debug("MemoryQuotaTracing: force acquire memory for table sink task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Uint64("memory", usedMem-availableMem))
availableMem = usedMem
}

// Do emit in such situations:
// 1. we use more memory than we required;
Expand All @@ -194,16 +204,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
}
}

if memoryHighUsage {
w.sinkMemQuota.forceAcquire(usedMem - availableMem)
log.Debug("MemoryQuotaTracing: force acquire memory for table sink task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Uint64("memory", usedMem-availableMem))
availableMem = usedMem
}

if allFinished {
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ func (s *EventSorter) OnResolve(action func(tablepb.Span, model.Ts)) {
}

// FetchByTable implements engine.SortEngine.
func (s *EventSorter) FetchByTable(
span tablepb.Span, lowerBound, upperBound engine.Position,
) engine.EventIterator {
func (s *EventSorter) FetchByTable(span tablepb.Span, lowerBound, upperBound engine.Position) engine.EventIterator {
value, exists := s.tables.Load(span)
if !exists {
log.Panic("fetch events from an unexist table", zap.Stringer("span", &span))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func TestEventSorter(t *testing.T) {
es.Add(span, model.NewPolymorphicEvent(entry))
}
es.Add(span, model.NewResolvedPolymorphicEvent(0, tc.resolvedTs))
iter := es.FetchByTable(
span, nextToFetch, engine.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs})
iter := es.FetchByTable(span, nextToFetch, engine.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs})
for _, expect := range tc.expect {
event, pos, _ := iter.Next()
require.NotNil(t, event)
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sourcemanager/engine/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func SorterWriteBytes() *prometheus.HistogramVec {
return sorterWriteBytesHistogram
}

// SorterIterReadDuration returns sorterIterReadDurationHistogram.
func SorterIterReadDuration() *prometheus.HistogramVec {
return sorterIterReadDurationHistogram
}

// InMemoryDataSize returns inMemoryDataSizeGauge.
func InMemoryDataSize() *prometheus.GaugeVec {
return inMemoryDataSizeGauge
Expand Down
26 changes: 21 additions & 5 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -64,6 +65,8 @@ type EventIter struct {
iter *pebble.Iterator
headItem *model.PolymorphicEvent
serde encoding.MsgPackGenSerde

nextDuration prometheus.Observer
}

// New creates an EventSorter instance.
Expand Down Expand Up @@ -205,10 +208,7 @@ func (s *EventSorter) OnResolve(action func(tablepb.Span, model.Ts)) {
}

// FetchByTable implements engine.SortEngine.
func (s *EventSorter) FetchByTable(
span tablepb.Span,
lowerBound, upperBound engine.Position,
) engine.EventIterator {
func (s *EventSorter) FetchByTable(span tablepb.Span, lowerBound, upperBound engine.Position) engine.EventIterator {
s.mu.RLock()
state, exists := s.tables.Get(span)
s.mu.RUnlock()
Expand All @@ -231,8 +231,21 @@ func (s *EventSorter) FetchByTable(
}

db := s.dbs[getDB(span, len(s.dbs))]
iterReadDur := engine.SorterIterReadDuration()

seekStart := time.Now()
iter := iterTable(db, state.uniqueID, span.TableID, lowerBound, upperBound)
return &EventIter{tableID: span.TableID, state: state, iter: iter, serde: s.serde}
iterReadDur.WithLabelValues(s.changefeedID.Namespace, s.changefeedID.ID, "first").
Observe(time.Since(seekStart).Seconds())

return &EventIter{
tableID: span.TableID,
state: state,
iter: iter,
serde: s.serde,

nextDuration: iterReadDur.WithLabelValues(s.changefeedID.Namespace, s.changefeedID.ID, "next"),
}
}

// FetchAllTables implements engine.SortEngine.
Expand Down Expand Up @@ -344,7 +357,10 @@ func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position,
valid := s.iter != nil && s.iter.Valid()
var value []byte
for valid {
nextStart := time.Now()
value, valid = s.iter.Value(), s.iter.Next()
s.nextDuration.Observe(time.Since(nextStart).Seconds())

event = &model.PolymorphicEvent{}
if _, err = s.serde.Unmarshal(event, value); err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func TestEventFetch(t *testing.T) {
timer := time.NewTimer(100 * time.Millisecond)
select {
case ts := <-resolvedTs:
iter := s.FetchByTable(
span, engine.Position{}, engine.Position{CommitTs: ts, StartTs: ts - 1})
iter := s.FetchByTable(span, engine.Position{}, engine.Position{CommitTs: ts, StartTs: ts - 1})
for {
event, pos, err := iter.Next()
require.Nil(t, err)
Expand Down
7 changes: 3 additions & 4 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,10 @@ func (m *ManagerImpl) flushLog(
}

func (m *ManagerImpl) onResolvedTsMsg(span tablepb.Span, resolvedTs model.Ts) {
value, loaded := m.rtsMap.Load(span)
if !loaded {
panic("onResolvedTsMsg is called for an invalid table")
// It's possible that the table is removed while redo log is still in writing.
if value, loaded := m.rtsMap.Load(span); loaded {
value.(*statefulRts).checkAndSetUnflushed(resolvedTs)
}
value.(*statefulRts).checkAndSetUnflushed(resolvedTs)
}

func (m *ManagerImpl) bgUpdateLog(
Expand Down

0 comments on commit 9794854

Please sign in to comment.