From 682ddaca07d772e84b772068140e590337964182 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 27 Jan 2021 15:52:00 +0800 Subject: [PATCH] restore: add importing progress and optimize the accuracy of restore progress (#506) --- lightning/backend/local.go | 51 +++++++----- lightning/metric/metric.go | 7 ++ lightning/restore/restore.go | 153 +++++++++++++++++++++++++++-------- 3 files changed, 160 insertions(+), 51 deletions(-) diff --git a/lightning/backend/local.go b/lightning/backend/local.go index 192242a43..76fa9c772 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb-lightning/lightning/glue" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-lightning/lightning/manual" + "github.com/pingcap/tidb-lightning/lightning/metric" "github.com/pingcap/tidb-lightning/lightning/worker" ) @@ -506,6 +507,11 @@ func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst return sst.NewImportSSTClient(conn), nil } +type rangeStats struct { + count int64 + totalBytes int64 +} + // WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv. // we don't need to do cleanup for the pairs written to tikv if encounters an error, // tikv will takes the responsibility to do so. @@ -514,28 +520,30 @@ func (local *local) WriteToTiKV( engineFile *LocalFile, region *split.RegionInfo, start, end []byte, -) ([]*sst.SSTMeta, *Range, error) { +) ([]*sst.SSTMeta, *Range, rangeStats, error) { begin := time.Now() regionRange := intersectRange(region.Region, Range{start: start, end: end}) opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end} iter := engineFile.db.NewIter(opt) defer iter.Close() + + stats := rangeStats{} + iter.First() if iter.Error() != nil { - return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key") + return nil, nil, stats, errors.Annotate(iter.Error(), "failed to read the first key") } - if !iter.Valid() { log.L().Info("keys within region is empty, skip ingest", log.ZapRedactBinary("start", start), log.ZapRedactBinary("regionStart", region.Region.StartKey), log.ZapRedactBinary("end", end), log.ZapRedactBinary("regionEnd", region.Region.EndKey)) - return nil, nil, nil + return nil, nil, stats, nil } firstKey := codec.EncodeBytes([]byte{}, iter.Key()) iter.Last() if iter.Error() != nil { - return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key") + return nil, nil, stats, errors.Annotate(iter.Error(), "failed to seek to the last key") } lastKey := codec.EncodeBytes([]byte{}, iter.Key()) @@ -556,12 +564,12 @@ func (local *local) WriteToTiKV( for _, peer := range region.Region.GetPeers() { cli, err := local.getImportClient(ctx, peer) if err != nil { - return nil, nil, err + return nil, nil, stats, err } wstream, err := cli.Write(ctx) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, stats, errors.Trace(err) } // Bind uuid for this write request @@ -571,7 +579,7 @@ func (local *local) WriteToTiKV( }, } if err = wstream.Send(req); err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, stats, errors.Trace(err) } req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ @@ -587,7 +595,7 @@ func (local *local) WriteToTiKV( pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs) count := 0 size := int64(0) - totalCount := 0 + totalCount := int64(0) firstLoop := true regionMaxSize := local.regionSplitSize * 4 / 3 @@ -611,7 +619,7 @@ func (local *local) WriteToTiKV( for i := range clients { requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] if err := clients[i].Send(requests[i]); err != nil { - return nil, nil, err + return nil, nil, stats, err } } count = 0 @@ -624,14 +632,14 @@ func (local *local) WriteToTiKV( } if iter.Error() != nil { - return nil, nil, errors.Trace(iter.Error()) + return nil, nil, stats, errors.Trace(iter.Error()) } if count > 0 { for i := range clients { requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count] if err := clients[i].Send(requests[i]); err != nil { - return nil, nil, err + return nil, nil, stats, err } } } @@ -639,7 +647,7 @@ func (local *local) WriteToTiKV( var leaderPeerMetas []*sst.SSTMeta for i, wStream := range clients { if resp, closeErr := wStream.CloseAndRecv(); closeErr != nil { - return nil, nil, closeErr + return nil, nil, stats, closeErr } else { if leaderID == region.Region.Peers[i].GetId() { leaderPeerMetas = resp.Metas @@ -652,14 +660,14 @@ func (local *local) WriteToTiKV( if leaderPeerMetas == nil { log.L().Warn("write to tikv no leader", log.ZapRedactReflect("region", region), zap.Uint64("leader_id", leaderID), log.ZapRedactReflect("meta", meta), - zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size)) - return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", + zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size)) + return nil, nil, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", region.Region.Id, leaderID) } log.L().Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID), zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas), - zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size), + zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size), zap.Int64("buf_size", bytesBuf.totalSize()), zap.Stringer("takeTime", time.Since(begin))) @@ -667,13 +675,15 @@ func (local *local) WriteToTiKV( if iter.Valid() && iter.Next() { firstKey := append([]byte{}, iter.Key()...) remainRange = &Range{start: firstKey, end: regionRange.end} - log.L().Info("write to tikv partial finish", zap.Int("count", totalCount), + log.L().Info("write to tikv partial finish", zap.Int64("count", totalCount), zap.Int64("size", size), log.ZapRedactBinary("startKey", regionRange.start), log.ZapRedactBinary("endKey", regionRange.end), log.ZapRedactBinary("remainStart", remainRange.start), log.ZapRedactBinary("remainEnd", remainRange.end), log.ZapRedactReflect("region", region)) } + stats.count = totalCount + stats.totalBytes = size - return leaderPeerMetas, remainRange, nil + return leaderPeerMetas, remainRange, stats, nil } func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) { @@ -976,10 +986,11 @@ func (local *local) writeAndIngestPairs( ) (*Range, error) { var err error var remainRange *Range + var rangeStats rangeStats loopWrite: for i := 0; i < maxRetryTimes; i++ { var metas []*sst.SSTMeta - metas, remainRange, err = local.WriteToTiKV(ctx, engineFile, region, start, end) + metas, remainRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end) if err != nil { log.L().Warn("write to tikv failed", log.ShortError(err)) return nil, err @@ -1055,6 +1066,8 @@ loopWrite: log.L().Warn("write and ingest region, will retry import full range", log.ShortError(err), log.ZapRedactStringer("region", region.Region), log.ZapRedactBinary("start", start), log.ZapRedactBinary("end", end)) + } else { + metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes)) } return remainRange, errors.Trace(err) } diff --git a/lightning/metric/metric.go b/lightning/metric/metric.go index ff3e8cdfc..bce7ad4bb 100644 --- a/lightning/metric/metric.go +++ b/lightning/metric/metric.go @@ -86,6 +86,12 @@ var ( Name: "chunks", Help: "count number of chunks processed", }, []string{"state"}) + BytesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "lightning", + Name: "bytes", + Help: "count of total bytes", + }, []string{"state"}) // state can be one of: // - estimated (an estimation derived from the file size) // - pending @@ -190,6 +196,7 @@ func init() { prometheus.MustRegister(TableCounter) prometheus.MustRegister(ProcessedEngineCounter) prometheus.MustRegister(ChunkCounter) + prometheus.MustRegister(BytesCounter) prometheus.MustRegister(ImportSecondsHistogram) prometheus.MustRegister(RowReadSecondsHistogram) prometheus.MustRegister(RowReadBytesHistogram) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 11b44ad3c..b1d354ac2 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -671,6 +671,8 @@ func verifyLocalFile(ctx context.Context, cpdb CheckpointsDB, dir string) error func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) error { estimatedChunkCount := 0.0 + estimatedEngineCnt := int64(0) + batchSize := int64(rc.cfg.Mydumper.BatchSize) for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) @@ -678,8 +680,12 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) if err != nil { return errors.Trace(err) } + fileChunks := make(map[string]float64) for engineId, eCp := range dbCp.Engines { + if eCp.Status < CheckpointStatusImported { + estimatedEngineCnt++ + } if engineId == indexEngineID { continue } @@ -691,6 +697,10 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) fileChunks[c.Key.Path] += remainChunkCnt } } + // estimate engines count if engine cp is empty + if len(dbCp.Engines) == 0 { + estimatedEngineCnt += ((tableMeta.TotalSize + batchSize - 1) / batchSize) + 1 + } for _, fileMeta := range tableMeta.DataFiles { if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok { estimatedChunkCount += cnt @@ -710,6 +720,8 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) } } metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(estimatedChunkCount) + metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess). + Add(float64(estimatedEngineCnt)) rc.tidbGlue.Record(glue.RecordEstimatedChunk, uint64(estimatedChunkCount)) return nil } @@ -835,7 +847,6 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } start := time.Now() - for { select { case <-ctx.Done(): @@ -852,32 +863,85 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan case <-logProgressTicker.C: // log the current progress periodically, so OPS will know that we're still working nanoseconds := float64(time.Since(start).Nanoseconds()) + // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate + // before the last table start, so use the bigger of the two should be a workaround estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated)) + pending := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending)) + if estimated < pending { + estimated = pending + } finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) totalTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess)) completedTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess)) bytesRead := metric.ReadHistogramSum(metric.RowReadBytesHistogram) + engineEstimated := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess)) + enginePending := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStatePending, metric.TableResultSuccess)) + if engineEstimated < enginePending { + engineEstimated = enginePending + } + engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess)) + bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten)) + bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported)) var state string var remaining zap.Field if finished >= estimated { - state = "post-processing" + if engineFinished < engineEstimated { + state = "importing" + } else { + state = "post-processing" + } remaining = zap.Skip() } else if finished > 0 { - remainNanoseconds := (estimated/finished - 1) * nanoseconds + state = "writing" - remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second)) + } else { - state = "writing" - remaining = zap.Skip() + state = "preparing" + + } + + // since we can't accurately estimate the extra time cost by import after all writing are finished, + // so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total + // progress. + remaining = zap.Skip() + totalPercent := 0.0 + if finished > 0 { + writePercent := math.Min(finished/estimated, 1.0) + importPercent := 1.0 + if bytesWritten > 0 { + totalBytes := bytesWritten / writePercent + importPercent = math.Min(bytesImported/totalBytes, 1.0) + } + totalPercent = writePercent*0.8 + importPercent*0.2 + if totalPercent < 1.0 { + remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds + remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second)) + } + } + + formatPercent := func(finish, estimate float64) string { + speed := "" + if estimated > 0 { + speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100) + } + return speed + } + + // avoid output bytes speed if there are no unfinished chunks + chunkSpeed := zap.Skip() + if bytesRead > 0 { + chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)) } // Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour. log.L().Info("progress", - zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)), - zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)), - zap.String("chunks", fmt.Sprintf("%.0f/%.0f", finished, estimated)), - zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)), + zap.String("total", fmt.Sprintf("%.1f%%", totalPercent*100)), + //zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)), + zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))), + zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))), + zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))), + chunkSpeed, zap.String("state", state), remaining, ) @@ -1220,7 +1284,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController defer wg.Done() engineLogTask := t.logger.With(zap.Int32("engineNumber", eid)).Begin(zap.InfoLevel, "restore engine") - dataClosedEngine, dataWorker, err := t.restoreEngine(ctx, rc, indexEngine, eid, ecp) + dataClosedEngine, err := t.restoreEngine(ctx, rc, indexEngine, eid, ecp) engineLogTask.End(zap.ErrorLevel, err) rc.tableWorkers.Recycle(w) if err != nil { @@ -1232,6 +1296,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController panic("forcing failure due to FailBeforeDataEngineImported") }) + dataWorker := rc.closedEngineLimit.Apply() defer rc.closedEngineLimit.Recycle(dataWorker) if err := t.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil { engineErr.Set(err) @@ -1292,23 +1357,21 @@ func (t *TableRestore) restoreEngine( indexEngine *kv.OpenedEngine, engineID int32, cp *EngineCheckpoint, -) (*kv.ClosedEngine, *worker.Worker, error) { +) (*kv.ClosedEngine, error) { if cp.Status >= CheckpointStatusClosed { - w := rc.closedEngineLimit.Apply() closedEngine, err := rc.backend.UnsafeCloseEngine(ctx, t.tableName, engineID) // If any error occurred, recycle worker immediately if err != nil { - rc.closedEngineLimit.Recycle(w) - return closedEngine, nil, errors.Trace(err) + return closedEngine, errors.Trace(err) } - return closedEngine, w, nil + return closedEngine, nil } logTask := t.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write") dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID) if err != nil { - return nil, nil, errors.Trace(err) + return nil, errors.Trace(err) } var wg sync.WaitGroup @@ -1322,7 +1385,7 @@ func (t *TableRestore) restoreEngine( select { case <-ctx.Done(): - return nil, nil, ctx.Err() + return nil, ctx.Err() default: } @@ -1337,9 +1400,13 @@ func (t *TableRestore) restoreEngine( // 4. flush kvs data (into tikv node) cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, t.tableInfo) if err != nil { - return nil, nil, errors.Trace(err) + return nil, errors.Trace(err) + } + var remainChunkCnt float64 + if chunk.Chunk.Offset < chunk.Chunk.EndOffset { + remainChunkCnt = float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset) + metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt) } - metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc() restoreWorker := rc.regionWorkers.Apply() wg.Add(1) @@ -1350,12 +1417,16 @@ func (t *TableRestore) restoreEngine( wg.Done() rc.regionWorkers.Recycle(w) }() - metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Inc() + metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) err := cr.restore(ctx, t, engineID, dataEngine, indexEngine, rc) if err == nil { - metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() - return + metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) + metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize())) + } else { + metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) + chunkErr.Set(err) } + metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Inc() chunkErr.Set(err) }(restoreWorker, cr) @@ -1377,6 +1448,17 @@ func (t *TableRestore) restoreEngine( zap.Uint64("written", totalKVSize), ) + flushAndSaveAllChunks := func() error { + if err = indexEngine.Flush(); err != nil { + return errors.Trace(err) + } + // Currently we write all the checkpoints after data&index engine are flushed. + for _, chunk := range cp.Chunks { + saveCheckpoint(rc, t, engineID, chunk) + } + return nil + } + // in local mode, this check-point make no sense, because we don't do flush now, // so there may be data lose if exit at here. So we don't write this checkpoint // here like other mode. @@ -1384,18 +1466,26 @@ func (t *TableRestore) restoreEngine( rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten) } if err != nil { - return nil, nil, errors.Trace(err) + // if process is canceled, we should flush all chunk checkpoints for local backend + if rc.isLocalBackend() && common.IsContextCanceledError(err) { + // ctx is canceled, so to avoid Close engine failed, we use `context.Background()` here + if _, err2 := dataEngine.Close(context.Background()); err2 != nil { + log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) + return nil, errors.Trace(err) + } + if err2 := flushAndSaveAllChunks(); err2 != nil { + log.L().Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) + } + } + return nil, errors.Trace(err) } - dataWorker := rc.closedEngineLimit.Apply() closedDataEngine, err := dataEngine.Close(ctx) // For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss. // this flush action impact up to 10% of the performance, so we only do it if necessary. if err == nil && rc.cfg.Checkpoint.Enable && rc.isLocalBackend() { - if err = indexEngine.Flush(); err != nil { - // If any error occurred, recycle worker immediately - rc.closedEngineLimit.Recycle(dataWorker) - return nil, nil, errors.Trace(err) + if err = flushAndSaveAllChunks(); err != nil { + return nil, errors.Trace(err) } // Currently we write all the checkpoints after data&index engine are flushed. for _, chunk := range cp.Chunks { @@ -1405,10 +1495,9 @@ func (t *TableRestore) restoreEngine( rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed) if err != nil { // If any error occurred, recycle worker immediately - rc.closedEngineLimit.Recycle(dataWorker) - return nil, nil, errors.Trace(err) + return nil, errors.Trace(err) } - return closedDataEngine, dataWorker, nil + return closedDataEngine, nil } func (t *TableRestore) importEngine(