Skip to content

Commit

Permalink
processor/sinkmanager(ticdc): background GC sort engine data (#7737)
Browse files Browse the repository at this point in the history
close #5928
  • Loading branch information
Rustin170506 authored Nov 30, 2022
1 parent 802c73e commit 519c6f0
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 75 deletions.
67 changes: 8 additions & 59 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
if alreadyExist {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return false
}
stats := p.sinkManager.GetTableStats(tableID)
tableResolvedTs = stats.ResolvedTs
tableCheckpointTs = stats.CheckpointTs
}
Expand Down Expand Up @@ -389,16 +380,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
if alreadyExist {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return 0, false
}
stats := p.sinkManager.GetTableStats(tableID)
tableCheckpointTs = stats.CheckpointTs
}
} else {
Expand Down Expand Up @@ -431,17 +413,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
}

if p.pullBasedSinking {
stats, err := p.sinkManager.GetTableStats(tableID)
// TODO: handle error
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return 0, false
}
stats := p.sinkManager.GetTableStats(tableID)
p.sourceManager.RemoveTable(tableID)
p.sinkManager.RemoveTable(tableID)
if p.redoManager.Enabled() {
Expand Down Expand Up @@ -499,20 +471,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
State: tablepb.TableStateAbsent,
}
}
sinkStats, err := p.sinkManager.GetTableStats(tableID)
// TODO: handle the error
if err != nil {
log.Warn("Failed to get table sinkStats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return tablepb.TableStatus{
TableID: tableID,
State: tablepb.TableStateAbsent,
}
}
sinkStats := p.sinkManager.GetTableStats(tableID)
return tablepb.TableStatus{
TableID: tableID,
Checkpoint: tablepb.Checkpoint{
Expand Down Expand Up @@ -755,9 +714,7 @@ func (p *processor) tick(ctx cdcContext.Context) error {
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := p.upstream.PDClock.CurrentTime()
if err := p.handlePosition(oracle.GetPhysical(pdTime)); err != nil {
return errors.Trace(err)
}
p.handlePosition(oracle.GetPhysical(pdTime))

p.doGCSchemaStorage()

Expand Down Expand Up @@ -1092,7 +1049,7 @@ func (p *processor) sendError(err error) {
// resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs).
// table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs.
// checkpointTs = min(resolvedTs, all table's checkpointTs).
func (p *processor) handlePosition(currentTs int64) error {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
minResolvedTableID := int64(0)
if p.schemaStorage != nil {
Expand All @@ -1103,10 +1060,7 @@ func (p *processor) handlePosition(currentTs int64) error {
if p.pullBasedSinking {
tableIDs := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tableIDs {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
return errors.Trace(err)
}
stats := p.sinkManager.GetTableStats(tableID)
log.Debug("sink manager gets table stats",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
Expand Down Expand Up @@ -1151,8 +1105,6 @@ func (p *processor) handlePosition(currentTs int64) error {

p.checkpointTs = minCheckpointTs
p.resolvedTs = minResolvedTs

return nil
}

// pushResolvedTs2Table sends global resolved ts to all the table pipelines.
Expand Down Expand Up @@ -1457,10 +1409,7 @@ func (p *processor) WriteDebugInfo(w io.Writer) error {
tables := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tables {
state, _ := p.sinkManager.GetTableState(tableID)
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
return err
}
stats := p.sinkManager.GetTableStats(tableID)
// TODO: add table name.
fmt.Fprintf(w, "tableID: %d, resolvedTs: %d, checkpointTs: %d, state: %s\n",
tableID, stats.ResolvedTs, stats.CheckpointTs, state)
Expand Down
64 changes: 59 additions & 5 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ const (
sinkWorkerNum = 8
redoWorkerNum = 4
defaultGenerateTaskInterval = 100 * time.Millisecond
defaultEngineGCChanSize = 128
)

type gcEvent struct {
tableID model.TableID
cleanPos engine.Position
}

// SinkManager is the implementation of SinkManager.
type SinkManager struct {
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -74,6 +80,9 @@ type SinkManager struct {
// lastBarrierTs is the last barrier ts.
lastBarrierTs atomic.Uint64

// engineGCChan is used to GC engine when the table is advanced.
engineGCChan chan *gcEvent

// sinkWorkers used to pull data from source manager.
sinkWorkers []*sinkWorker
// sinkTaskChan is used to send tasks to sinkWorkers.
Expand Down Expand Up @@ -124,6 +133,8 @@ func New(
sinkFactory: tableSinkFactory,
sortEngine: sortEngine,

engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize),

sinkProgressHeap: newTableProgresses(),
sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum),
sinkTaskChan: make(chan *sinkTask),
Expand All @@ -142,6 +153,7 @@ func New(

m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startGenerateTasks()
m.backgroundGC()

log.Info("Sink manager is created",
zap.String("namespace", changefeedID.Namespace),
Expand Down Expand Up @@ -255,6 +267,39 @@ func (m *SinkManager) startGenerateTasks() {
}()
}

// backgroundGC is used to clean up the old data in the sorter.
func (m *SinkManager) backgroundGC() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.ctx.Done():
log.Info("Background GC is stooped because context is canceled",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
return
case gcEvent := <-m.engineGCChan:
if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
log.Error("Failed to clean table in sort engine",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", gcEvent.tableID),
zap.Error(err))
select {
case m.errChan <- err:
default:
log.Error("Failed to send error to error channel, error channel is full",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
}
}
}
}
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks() error {
taskTicker := time.NewTicker(defaultGenerateTaskInterval)
Expand Down Expand Up @@ -581,7 +626,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
}

// GetTableStats returns the state of the table.
func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, error) {
func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when getting table stats",
Expand All @@ -596,9 +641,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro
StartTs: resolvedMark - 1,
CommitTs: resolvedMark,
}
err := m.sortEngine.CleanByTable(tableID, cleanPos)
if err != nil {
return pipeline.Stats{}, errors.Trace(err)
gcEvent := &gcEvent{
tableID: tableID,
cleanPos: cleanPos,
}
select {
case m.engineGCChan <- gcEvent:
default:
log.Warn("Failed to send GC event to engine GC channel, engine GC channel is full",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("cleanPos", cleanPos))
}
var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
Expand All @@ -611,7 +665,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
}, nil
}
}

// Close closes all workers.
Expand Down
Loading

0 comments on commit 519c6f0

Please sign in to comment.