Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
add total progress
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 7, 2020
1 parent a937eb7 commit c2f60f6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 21 deletions.
44 changes: 28 additions & 16 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb-lightning/lightning/manual"
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/worker"
)

Expand Down Expand Up @@ -486,6 +487,11 @@ func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst
return sst.NewImportSSTClient(conn), nil
}

type rangeStats struct {
count int64
totalBytes int64
}

// WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will takes the responsibility to do so.
Expand All @@ -494,18 +500,19 @@ func (local *local) WriteToTiKV(
engineFile *LocalFile,
region *split.RegionInfo,
start, end []byte,
) ([]*sst.SSTMeta, *Range, error) {
) ([]*sst.SSTMeta, *Range, rangeStats, error) {
begin := time.Now()
regionRange := intersectRange(region.Region, Range{start: start, end: end})
opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end}
iter := engineFile.db.NewIter(opt)
defer iter.Close()

stats := rangeStats{}
if !iter.First() {
log.L().Info("keys within region is empty, skip ingest", zap.Binary("start", start),
zap.Binary("regionStart", region.Region.StartKey), zap.Binary("end", end),
zap.Binary("regionEnd", region.Region.EndKey))
return nil, nil, nil
return nil, nil, stats, nil
}

firstKey := codec.EncodeBytes([]byte{}, iter.Key())
Expand All @@ -529,12 +536,12 @@ func (local *local) WriteToTiKV(
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
if err != nil {
return nil, nil, err
return nil, nil, stats, err
}

wstream, err := cli.Write(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, stats, errors.Trace(err)
}

// Bind uuid for this write request
Expand All @@ -544,7 +551,7 @@ func (local *local) WriteToTiKV(
},
}
if err = wstream.Send(req); err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, stats, errors.Trace(err)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
Expand All @@ -560,7 +567,7 @@ func (local *local) WriteToTiKV(
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
size := int64(0)
totalCount := 0
totalCount := int64(0)
firstLoop := true
regionMaxSize := local.regionSplitSize * 4 / 3

Expand All @@ -584,7 +591,7 @@ func (local *local) WriteToTiKV(
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, nil, err
return nil, nil, stats, err
}
}
count = 0
Expand All @@ -600,19 +607,19 @@ func (local *local) WriteToTiKV(
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, nil, err
return nil, nil, stats, err
}
}
}

if iter.Error() != nil {
return nil, nil, errors.Trace(iter.Error())
return nil, nil, stats, errors.Trace(iter.Error())
}

var leaderPeerMetas []*sst.SSTMeta
for i, wStream := range clients {
if resp, closeErr := wStream.CloseAndRecv(); closeErr != nil {
return nil, nil, closeErr
return nil, nil, stats, closeErr
} else {
if leaderID == region.Region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
Expand All @@ -625,28 +632,30 @@ func (local *local) WriteToTiKV(
if leaderPeerMetas == nil {
log.L().Warn("write to tikv no leader", zap.Reflect("region", region),
zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, nil, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
region.Region.Id, leaderID)
}

log.L().Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID),
zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("buf_size", bytesBuf.totalSize()),
zap.Stringer("takeTime", time.Since(begin)))

var remainRange *Range
if iter.Valid() && iter.Next() {
firstKey := append([]byte{}, iter.Key()...)
remainRange = &Range{start: firstKey, end: regionRange.end}
log.L().Info("write to tikv partial finish", zap.Int("count", totalCount),
log.L().Info("write to tikv partial finish", zap.Int64("count", totalCount),
zap.Int64("size", size), zap.Binary("startKey", regionRange.start), zap.Binary("endKey", regionRange.end),
zap.Binary("remainStart", remainRange.start), zap.Binary("remainEnd", remainRange.end),
zap.Reflect("region", region))
}
stats.count = totalCount
stats.totalBytes = size

return leaderPeerMetas, remainRange, nil
return leaderPeerMetas, remainRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
Expand Down Expand Up @@ -935,10 +944,11 @@ func (local *local) writeAndIngestPairs(
) (*Range, error) {
var err error
var remainRange *Range
var rangeStats rangeStats
loopWrite:
for i := 0; i < maxRetryTimes; i++ {
var metas []*sst.SSTMeta
metas, remainRange, err = local.WriteToTiKV(ctx, engineFile, region, start, end)
metas, remainRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end)
if err != nil {
log.L().Warn("write to tikv failed", log.ShortError(err))
return nil, err
Expand Down Expand Up @@ -992,6 +1002,8 @@ loopWrite:
if err != nil {
log.L().Warn("write and ingest region, will retry import full range", log.ShortError(err),
zap.Stringer("region", region.Region), zap.Binary("start", start), zap.Binary("end", end))
} else {
metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes))
}
return remainRange, errors.Trace(err)
}
Expand Down
7 changes: 7 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ var (
Name: "chunks",
Help: "count number of chunks processed",
}, []string{"state"})
BytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "lightning",
Name: "bytes",
Help: "count of total bytes",
}, []string{"state"})
// state can be one of:
// - estimated (an estimation derived from the file size)
// - pending
Expand Down Expand Up @@ -190,6 +196,7 @@ func init() {
prometheus.MustRegister(TableCounter)
prometheus.MustRegister(ProcessedEngineCounter)
prometheus.MustRegister(ChunkCounter)
prometheus.MustRegister(BytesCounter)
prometheus.MustRegister(ImportSecondsHistogram)
prometheus.MustRegister(RowReadSecondsHistogram)
prometheus.MustRegister(RowReadBytesHistogram)
Expand Down
31 changes: 26 additions & 5 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}

start := time.Now()

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -637,24 +636,44 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
engineEstimated = enginePending
}
engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess))
bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten))
bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported))

var state string
var remaining zap.Field
if finished >= estimated {
if engineFinished < engineEstimated {
state = "importing"
// TODO: how to estimate time remaining for importing
} else {
state = "post-processing"
}
remaining = zap.Skip()
} else if finished > 0 {
remainNanoseconds := (estimated/finished - 1) * nanoseconds

state = "writing"
remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second))

} else {
state = "writing"
remaining = zap.Skip()

}

// since we can't accurately estimate the extra time cost by import after all writing are finished,
// so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total
// progress.
remaining = zap.Skip()
totalPercent := 0.0
if finished > 0 {
writePercent := math.Min(finished/estimated, 1.0)
importPercent := 1.0
if bytesWritten > 0 {
totalBytes := 1.0 / writePercent * bytesWritten
importPercent = math.Min(bytesImported/totalBytes, 1.0)
}
totalPercent = writePercent*0.8 + importPercent*0.2
if totalPercent < 1.0 {
remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds
remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second))
}
}

formatPercent := func(finish, estimate float64) string {
Expand All @@ -673,6 +692,7 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan

// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
log.L().Info("progress",
zap.String("total", fmt.Sprintf("%.1f%%", totalPercent*100)),
//zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)),
zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))),
zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))),
Expand Down Expand Up @@ -1119,6 +1139,7 @@ func (t *TableRestore) restoreEngine(
err := cr.restore(ctx, t, engineID, dataEngine, indexEngine, rc)
if err == nil {
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt)
metric.BytesCounter.WithLabelValues(metric.TableStateWritten).Add(float64(cr.chunk.Checksum.SumSize()))
return
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt)
Expand Down

0 comments on commit c2f60f6

Please sign in to comment.