From 78e064d802b312ee2760ef576bdabac26316bcc9 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 23 Oct 2020 14:42:07 +0800 Subject: [PATCH] restore: better estimate task remain time progress log (#377) * optimize progress * update Co-authored-by: 3pointer Co-authored-by: kennytm --- lightning/restore/restore.go | 37 ++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index de2745c09..af27556fb 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -18,6 +18,7 @@ import ( "database/sql" "fmt" "io" + "math" "os" "strings" "sync" @@ -360,8 +361,8 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error { rc.rowFormatVer = ObtainRowFormatVersion(ctx, tidbMgr.db) // Estimate the number of chunks for progress reporting - rc.estimateChunkCountIntoMetrics() - return nil + err = rc.estimateChunkCountIntoMetrics(ctx) + return err } // verifyCheckpoint check whether previous task checkpoint is compatible with task config @@ -420,15 +421,37 @@ func verifyCheckpoint(cfg *config.Config, taskCp *TaskCheckpoint) error { return nil } -func (rc *RestoreController) estimateChunkCountIntoMetrics() { - estimatedChunkCount := 0 +func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) error { + estimatedChunkCount := 0.0 for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { + tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) + dbCp, err := rc.checkpointsDB.Get(ctx, tableName) + if err != nil { + return errors.Trace(err) + } + fileChunks := make(map[string]float64) + for engineId, eCp := range dbCp.Engines { + if engineId == indexEngineID { + continue + } + for _, c := range eCp.Chunks { + if _, ok := fileChunks[c.Key.Path]; !ok { + fileChunks[c.Key.Path] = 0.0 + } + remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset) + fileChunks[c.Key.Path] += remainChunkCnt + } + } for _, fileMeta := range tableMeta.DataFiles { + if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok { + estimatedChunkCount += cnt + continue + } if fileMeta.FileMeta.Type == mydump.SourceTypeCSV { cfg := rc.cfg.Mydumper if fileMeta.Size > cfg.MaxRegionSize && cfg.StrictFormat && !cfg.CSV.Header { - estimatedChunkCount += int(fileMeta.Size / cfg.MaxRegionSize) + estimatedChunkCount += math.Round(float64(fileMeta.Size) / float64(cfg.MaxRegionSize)) } else { estimatedChunkCount += 1 } @@ -438,7 +461,8 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics() { } } } - metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(float64(estimatedChunkCount)) + metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(estimatedChunkCount) + return nil } func (rc *RestoreController) saveStatusCheckpoint(tableName string, engineID int32, err error, statusIfSucceed CheckpointStatus) { @@ -600,6 +624,7 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan log.L().Info("progress", zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)), zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)), + zap.String("chunks", fmt.Sprintf("%.0f/%.0f", finished, estimated)), zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)), zap.String("state", state), remaining,