Skip to content

Commit

Permalink
restore: better estimate task remain time progress log (pingcap#377)
Browse files Browse the repository at this point in the history
* optimize progress

* update

Co-authored-by: 3pointer <luancheng@pingcap.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored Oct 23, 2020
1 parent 5b212bf commit 78e064d
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"io"
"math"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -360,8 +361,8 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error {
rc.rowFormatVer = ObtainRowFormatVersion(ctx, tidbMgr.db)

// Estimate the number of chunks for progress reporting
rc.estimateChunkCountIntoMetrics()
return nil
err = rc.estimateChunkCountIntoMetrics(ctx)
return err
}

// verifyCheckpoint check whether previous task checkpoint is compatible with task config
Expand Down Expand Up @@ -420,15 +421,37 @@ func verifyCheckpoint(cfg *config.Config, taskCp *TaskCheckpoint) error {
return nil
}

func (rc *RestoreController) estimateChunkCountIntoMetrics() {
estimatedChunkCount := 0
func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) error {
estimatedChunkCount := 0.0
for _, dbMeta := range rc.dbMetas {
for _, tableMeta := range dbMeta.Tables {
tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name)
dbCp, err := rc.checkpointsDB.Get(ctx, tableName)
if err != nil {
return errors.Trace(err)
}
fileChunks := make(map[string]float64)
for engineId, eCp := range dbCp.Engines {
if engineId == indexEngineID {
continue
}
for _, c := range eCp.Chunks {
if _, ok := fileChunks[c.Key.Path]; !ok {
fileChunks[c.Key.Path] = 0.0
}
remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset)
fileChunks[c.Key.Path] += remainChunkCnt
}
}
for _, fileMeta := range tableMeta.DataFiles {
if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok {
estimatedChunkCount += cnt
continue
}
if fileMeta.FileMeta.Type == mydump.SourceTypeCSV {
cfg := rc.cfg.Mydumper
if fileMeta.Size > cfg.MaxRegionSize && cfg.StrictFormat && !cfg.CSV.Header {
estimatedChunkCount += int(fileMeta.Size / cfg.MaxRegionSize)
estimatedChunkCount += math.Round(float64(fileMeta.Size) / float64(cfg.MaxRegionSize))
} else {
estimatedChunkCount += 1
}
Expand All @@ -438,7 +461,8 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics() {
}
}
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(float64(estimatedChunkCount))
metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(estimatedChunkCount)
return nil
}

func (rc *RestoreController) saveStatusCheckpoint(tableName string, engineID int32, err error, statusIfSucceed CheckpointStatus) {
Expand Down Expand Up @@ -600,6 +624,7 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
log.L().Info("progress",
zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f", finished, estimated)),
zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)),
zap.String("state", state),
remaining,
Expand Down

0 comments on commit 78e064d

Please sign in to comment.