diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 14868cfcf07..f41fa9ab321 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -129,6 +129,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { // lastResolvedTs records the max resolved ts we have seen from redo logs. lastResolvedTs := checkpointTs cachedRows := make([]*model.RowChangedEvent, 0, emitBatch) + tableResolvedTsMap := make(map[model.TableID]model.Ts) for { redoLogs, err := ra.rd.ReadNextLog(ctx, readBatch) if err != nil { @@ -139,6 +140,10 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { } for _, redoLog := range redoLogs { + tableID := redoLog.Row.Table.TableID + if _, ok := tableResolvedTsMap[redoLog.Row.Table.TableID]; !ok { + tableResolvedTsMap[tableID] = lastSafeResolvedTs + } if len(cachedRows) >= emitBatch { err := s.EmitRowChangedEvents(ctx, cachedRows...) if err != nil { @@ -147,27 +152,33 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { cachedRows = make([]*model.RowChangedEvent, 0, emitBatch) } cachedRows = append(cachedRows, redo.LogToRow(redoLog)) - if redoLog.Row.CommitTs > lastResolvedTs { - lastSafeResolvedTs, lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs + + if redoLog.Row.CommitTs > tableResolvedTsMap[tableID] { + tableResolvedTsMap[tableID], lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs } } - // todo: use real table ID - _, err = s.FlushRowChangedEvents(ctx, 0, lastSafeResolvedTs) - if err != nil { - return err + + for tableID, tableLastResolvedTs := range tableResolvedTsMap { + _, err = s.FlushRowChangedEvents(ctx, tableID, tableLastResolvedTs) + if err != nil { + return err + } } } err = s.EmitRowChangedEvents(ctx, cachedRows...) if err != nil { return err } - _, err = s.FlushRowChangedEvents(ctx, 0, resolvedTs) - if err != nil { - return err - } - err = s.Barrier(ctx, 0) - if err != nil { - return err + + for tableID := range tableResolvedTsMap { + _, err = s.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return err + } + err = s.Barrier(ctx, tableID) + if err != nil { + return err + } } return errApplyFinished }