Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
add importing progress and more accurate restore progress
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 3, 2020
1 parent 95e4aa2 commit c140cc7
Showing 1 changed file with 74 additions and 25 deletions.
99 changes: 74 additions & 25 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,18 +427,24 @@ func verifyCheckpoint(cfg *config.Config, taskCp *TaskCheckpoint) error {

func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) error {
estimatedChunkCount := 0.0
estimatedEngineCnt := int64(0)
batchSize := int64(rc.cfg.Mydumper.BatchSize)
for _, dbMeta := range rc.dbMetas {
for _, tableMeta := range dbMeta.Tables {
tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name)
dbCp, err := rc.checkpointsDB.Get(ctx, tableName)
if err != nil {
return errors.Trace(err)
}

fileChunks := make(map[string]float64)
for engineId, eCp := range dbCp.Engines {
if engineId == indexEngineID {
continue
}
if eCp.Status < CheckpointStatusImported {
estimatedEngineCnt++
}
for _, c := range eCp.Chunks {
if _, ok := fileChunks[c.Key.Path]; !ok {
fileChunks[c.Key.Path] = 0.0
Expand All @@ -447,6 +453,10 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context)
fileChunks[c.Key.Path] += remainChunkCnt
}
}
// data engine count + index engine count. < 2 means engines is not populated yet.
if len(dbCp.Engines) < 2 {
estimatedEngineCnt += ((tableMeta.TotalSize + batchSize - 1) / batchSize) + 1
}
for _, fileMeta := range tableMeta.DataFiles {
if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok {
estimatedChunkCount += cnt
Expand All @@ -466,6 +476,8 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context)
}
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(estimatedChunkCount)
metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess).
Add(float64(estimatedEngineCnt))
rc.tidbGlue.Record(glue.RecordEstimatedChunk, uint64(estimatedChunkCount))
return nil
}
Expand Down Expand Up @@ -608,16 +620,33 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
case <-logProgressTicker.C:
// log the current progress periodically, so OPS will know that we're still working
nanoseconds := float64(time.Since(start).Nanoseconds())
// the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate
// before the last table start, so use the bigger of the two should be a workaround
estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
pending := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending))
if estimated < pending {
estimated = pending
}
finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished))
totalTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess))
completedTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess))
bytesRead := metric.ReadHistogramSum(metric.RowReadBytesHistogram)
engineEstimated := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess))
enginePending := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStatePending, metric.TableResultSuccess))
if engineEstimated < enginePending {
engineEstimated = enginePending
}
engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))

var state string
var remaining zap.Field
if finished >= estimated {
state = "post-processing"
if engineFinished < engineEstimated {
state = "importing"
// TODO: how to estimate time remaining for importing
} else {
state = "post-processing"
}
remaining = zap.Skip()
} else if finished > 0 {
remainNanoseconds := (estimated/finished - 1) * nanoseconds
Expand All @@ -630,9 +659,10 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan

// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
log.L().Info("progress",
zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
//zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f", finished, estimated)),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
zap.String("engines", fmt.Sprintf("%.f/%.f (%.1f%%)", engineFinished, engineEstimated, engineFinished/engineEstimated*100)),
zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)),
zap.String("state", state),
remaining,
Expand Down Expand Up @@ -940,7 +970,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController
defer wg.Done()

engineLogTask := t.logger.With(zap.Int32("engineNumber", eid)).Begin(zap.InfoLevel, "restore engine")
dataClosedEngine, dataWorker, err := t.restoreEngine(ctx, rc, indexEngine, eid, ecp)
dataClosedEngine, err := t.restoreEngine(ctx, rc, indexEngine, eid, ecp)
engineLogTask.End(zap.ErrorLevel, err)
rc.tableWorkers.Recycle(w)
if err != nil {
Expand All @@ -952,6 +982,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController
panic("forcing failure due to FailBeforeDataEngineImported")
})

dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
if err := t.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil {
engineErr.Set(err)
Expand Down Expand Up @@ -1013,23 +1044,21 @@ func (t *TableRestore) restoreEngine(
indexEngine *kv.OpenedEngine,
engineID int32,
cp *EngineCheckpoint,
) (*kv.ClosedEngine, *worker.Worker, error) {
) (*kv.ClosedEngine, error) {
if cp.Status >= CheckpointStatusClosed {
w := rc.closedEngineLimit.Apply()
closedEngine, err := rc.backend.UnsafeCloseEngine(ctx, t.tableName, engineID)
// If any error occurred, recycle worker immediately
if err != nil {
rc.closedEngineLimit.Recycle(w)
return closedEngine, nil, errors.Trace(err)
return closedEngine, errors.Trace(err)
}
return closedEngine, w, nil
return closedEngine, nil
}

logTask := t.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write")

dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}

var wg sync.WaitGroup
Expand All @@ -1043,7 +1072,7 @@ func (t *TableRestore) restoreEngine(

select {
case <-ctx.Done():
return nil, nil, ctx.Err()
return nil, ctx.Err()
default:
}

Expand All @@ -1058,9 +1087,10 @@ func (t *TableRestore) restoreEngine(
// 4. flush kvs data (into tikv node)
cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, t.tableInfo)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()
remainChunkCnt := float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset)
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt)

restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
Expand All @@ -1071,13 +1101,13 @@ func (t *TableRestore) restoreEngine(
wg.Done()
rc.regionWorkers.Recycle(w)
}()
metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Inc()
metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt)
err := cr.restore(ctx, t, engineID, dataEngine, indexEngine, rc)
if err == nil {
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc()
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt)
return
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Inc()
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt)
chunkErr.Set(err)
}(restoreWorker, cr)
}
Expand All @@ -1098,25 +1128,45 @@ func (t *TableRestore) restoreEngine(
zap.Uint64("written", totalKVSize),
)

flushAndSaveAllChunks := func() error {
if err = indexEngine.Flush(); err != nil {
// If any error occurred, recycle worker immediately
return errors.Trace(err)
}
// Currently we write all the checkpoints after data&index engine are flushed.
for _, chunk := range cp.Chunks {
saveCheckpoint(rc, t, engineID, chunk)
}
return nil
}

// in local mode, this check-point make no sense, because we don't do flush now,
// so there may be data lose if exit at here. So we don't write this checkpoint
// here like other mode.
if !rc.isLocalBackend() {
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten)
}
if err != nil {
return nil, nil, errors.Trace(err)
// if process is canceled, we should flush all chunk checkpoints for local backend
if rc.isLocalBackend() && common.IsContextCanceledError(err) {
// ctx is cancel, so to avoid Close engine failed, we use `context.TODO()` here
if _, err2 := dataEngine.Close(context.TODO()); err2 != nil {
log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2))
return nil, errors.Trace(err)
}
if err2 := flushAndSaveAllChunks(); err2 != nil {
log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2))
}
}
return nil, errors.Trace(err)
}

dataWorker := rc.closedEngineLimit.Apply()
closedDataEngine, err := dataEngine.Close(ctx)
// For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss.
// this flush action impact up to 10% of the performance, so we only do it if necessary.
if err == nil && rc.cfg.Checkpoint.Enable && rc.isLocalBackend() {
if err = indexEngine.Flush(); err != nil {
// If any error occurred, recycle worker immediately
rc.closedEngineLimit.Recycle(dataWorker)
return nil, nil, errors.Trace(err)
if err = flushAndSaveAllChunks(); err != nil {
return nil, errors.Trace(err)
}
// Currently we write all the checkpoints after data&index engine are flushed.
for _, chunk := range cp.Chunks {
Expand All @@ -1126,10 +1176,9 @@ func (t *TableRestore) restoreEngine(
rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed)
if err != nil {
// If any error occurred, recycle worker immediately
rc.closedEngineLimit.Recycle(dataWorker)
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
return closedDataEngine, dataWorker, nil
return closedDataEngine, nil
}

func (t *TableRestore) importEngine(
Expand Down

0 comments on commit c140cc7

Please sign in to comment.