Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor/sinkmanager(ticdc): background GC sort engine data #7737

Merged
67 changes: 8 additions & 59 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
if alreadyExist {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return false
}
stats := p.sinkManager.GetTableStats(tableID)
tableResolvedTs = stats.ResolvedTs
tableCheckpointTs = stats.CheckpointTs
}
Expand Down Expand Up @@ -388,16 +379,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
if alreadyExist {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return 0, false
}
stats := p.sinkManager.GetTableStats(tableID)
tableCheckpointTs = stats.CheckpointTs
}
} else {
Expand Down Expand Up @@ -430,17 +412,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
}

if p.pullBasedSinking {
stats, err := p.sinkManager.GetTableStats(tableID)
// TODO: handle error
if err != nil {
log.Warn("Failed to get table stats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return 0, false
}
stats := p.sinkManager.GetTableStats(tableID)
p.sourceManager.RemoveTable(tableID)
p.sinkManager.RemoveTable(tableID)
if p.redoManager.Enabled() {
Expand Down Expand Up @@ -498,20 +470,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
State: tablepb.TableStateAbsent,
}
}
sinkStats, err := p.sinkManager.GetTableStats(tableID)
// TODO: handle the error
if err != nil {
log.Warn("Failed to get table sinkStats",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Error(err))
return tablepb.TableStatus{
TableID: tableID,
State: tablepb.TableStateAbsent,
}
}
sinkStats := p.sinkManager.GetTableStats(tableID)
return tablepb.TableStatus{
TableID: tableID,
Checkpoint: tablepb.Checkpoint{
Expand Down Expand Up @@ -754,9 +713,7 @@ func (p *processor) tick(ctx cdcContext.Context) error {
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := p.upstream.PDClock.CurrentTime()
if err := p.handlePosition(oracle.GetPhysical(pdTime)); err != nil {
return errors.Trace(err)
}
p.handlePosition(oracle.GetPhysical(pdTime))

p.doGCSchemaStorage()

Expand Down Expand Up @@ -1085,7 +1042,7 @@ func (p *processor) sendError(err error) {
// resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs).
// table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs.
// checkpointTs = min(resolvedTs, all table's checkpointTs).
func (p *processor) handlePosition(currentTs int64) error {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
minResolvedTableID := int64(0)
if p.schemaStorage != nil {
Expand All @@ -1096,10 +1053,7 @@ func (p *processor) handlePosition(currentTs int64) error {
if p.pullBasedSinking {
tableIDs := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tableIDs {
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
return errors.Trace(err)
}
stats := p.sinkManager.GetTableStats(tableID)
log.Debug("sink manager gets table stats",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
Expand Down Expand Up @@ -1144,8 +1098,6 @@ func (p *processor) handlePosition(currentTs int64) error {

p.checkpointTs = minCheckpointTs
p.resolvedTs = minResolvedTs

return nil
}

// pushResolvedTs2Table sends global resolved ts to all the table pipelines.
Expand Down Expand Up @@ -1450,10 +1402,7 @@ func (p *processor) WriteDebugInfo(w io.Writer) error {
tables := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tables {
state, _ := p.sinkManager.GetTableState(tableID)
stats, err := p.sinkManager.GetTableStats(tableID)
if err != nil {
return err
}
stats := p.sinkManager.GetTableStats(tableID)
// TODO: add table name.
fmt.Fprintf(w, "tableID: %d, resolvedTs: %d, checkpointTs: %d, state: %s\n",
tableID, stats.ResolvedTs, stats.CheckpointTs, state)
Expand Down
64 changes: 59 additions & 5 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ const (
sinkWorkerNum = 8
redoWorkerNum = 4
defaultGenerateTaskInterval = 100 * time.Millisecond
defaultEngineGCChanSize = 128
)

type gcEvent struct {
tableID model.TableID
cleanPos engine.Position
}

// SinkManager is the implementation of SinkManager.
type SinkManager struct {
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -74,6 +80,9 @@ type SinkManager struct {
// lastBarrierTs is the last barrier ts.
lastBarrierTs atomic.Uint64

// engineGCChan is used to GC engine when the table is advanced.
engineGCChan chan *gcEvent

// sinkWorkers used to pull data from source manager.
sinkWorkers []*sinkWorker
// sinkTaskChan is used to send tasks to sinkWorkers.
Expand Down Expand Up @@ -124,6 +133,8 @@ func New(
sinkFactory: tableSinkFactory,
sortEngine: sortEngine,

engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize),

sinkProgressHeap: newTableProgresses(),
sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum),
sinkTaskChan: make(chan *sinkTask),
Expand All @@ -142,6 +153,7 @@ func New(

m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startGenerateTasks()
m.backgroundGC()

log.Info("Sink manager is created",
zap.String("namespace", changefeedID.Namespace),
Expand Down Expand Up @@ -255,6 +267,39 @@ func (m *SinkManager) startGenerateTasks() {
}()
}

// backgroundGC is used to clean up the old data in the sorter.
func (m *SinkManager) backgroundGC() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.ctx.Done():
log.Info("Background GC is stooped because context is canceled",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
return
case gcEvent := <-m.engineGCChan:
if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
log.Error("Failed to clean table in sort engine",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", gcEvent.tableID),
zap.Error(err))
select {
case m.errChan <- err:
default:
log.Error("Failed to send error to error channel, error channel is full",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
}
}
}
}
}()
}

// generateSinkTasks generates tasks to fetch data from the source manager.
func (m *SinkManager) generateSinkTasks() error {
taskTicker := time.NewTicker(defaultGenerateTaskInterval)
Expand Down Expand Up @@ -581,7 +626,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
}

// GetTableStats returns the state of the table.
func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, error) {
func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when getting table stats",
Expand All @@ -596,9 +641,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro
StartTs: resolvedMark - 1,
CommitTs: resolvedMark,
}
err := m.sortEngine.CleanByTable(tableID, cleanPos)
if err != nil {
return pipeline.Stats{}, errors.Trace(err)
gcEvent := &gcEvent{
tableID: tableID,
cleanPos: cleanPos,
}
select {
case m.engineGCChan <- gcEvent:
default:
log.Debug("Failed to send GC event to engine GC channel, engine GC channel is full",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the log level to Warn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be too annoying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are dropped, we must have some ways to know it and then we can fix it.

zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("cleanPos", cleanPos))
}
var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
Expand All @@ -611,7 +665,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) (pipeline.Stats, erro
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
}, nil
}
}

// Close closes all workers.
Expand Down
Loading