Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: add importing progress and optimize the accuracy of restore progress #506

Merged
merged 9 commits into from
Jan 27, 2021
51 changes: 32 additions & 19 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/glue"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb-lightning/lightning/manual"
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/worker"
)

Expand Down Expand Up @@ -561,6 +562,11 @@ func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst
return sst.NewImportSSTClient(conn), nil
}

type rangeStats struct {
count int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this member is not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, currently only uses the totalBytes

totalBytes int64
}

// WriteToTiKV writer engine key-value pairs to tikv and return the sst meta generated by tikv.
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will takes the responsibility to do so.
Expand All @@ -569,28 +575,30 @@ func (local *local) WriteToTiKV(
engineFile *LocalFile,
region *split.RegionInfo,
start, end []byte,
) ([]*sst.SSTMeta, *Range, error) {
) ([]*sst.SSTMeta, *Range, rangeStats, error) {
begin := time.Now()
regionRange := intersectRange(region.Region, Range{start: start, end: end})
opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end}
iter := engineFile.db.NewIter(opt)
defer iter.Close()

stats := rangeStats{}

iter.First()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key")
return nil, nil, stats, errors.Annotate(iter.Error(), "failed to read the first key")
}

if !iter.Valid() {
log.L().Info("keys within region is empty, skip ingest", log.ZapRedactBinary("start", start),
log.ZapRedactBinary("regionStart", region.Region.StartKey), log.ZapRedactBinary("end", end),
log.ZapRedactBinary("regionEnd", region.Region.EndKey))
return nil, nil, nil
return nil, nil, stats, nil
}

firstKey := codec.EncodeBytes([]byte{}, iter.Key())
iter.Last()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key")
return nil, nil, stats, errors.Annotate(iter.Error(), "failed to seek to the last key")
}
lastKey := codec.EncodeBytes([]byte{}, iter.Key())

Expand All @@ -611,12 +619,12 @@ func (local *local) WriteToTiKV(
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer)
if err != nil {
return nil, nil, err
return nil, nil, stats, err
}

wstream, err := cli.Write(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, stats, errors.Trace(err)
}

// Bind uuid for this write request
Expand All @@ -626,7 +634,7 @@ func (local *local) WriteToTiKV(
},
}
if err = wstream.Send(req); err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, stats, errors.Trace(err)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
Expand All @@ -642,7 +650,7 @@ func (local *local) WriteToTiKV(
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
size := int64(0)
totalCount := 0
totalCount := int64(0)
firstLoop := true
regionMaxSize := local.regionSplitSize * 4 / 3

Expand All @@ -666,7 +674,7 @@ func (local *local) WriteToTiKV(
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, nil, err
return nil, nil, stats, err
}
}
count = 0
Expand All @@ -679,22 +687,22 @@ func (local *local) WriteToTiKV(
}

if iter.Error() != nil {
return nil, nil, errors.Trace(iter.Error())
return nil, nil, stats, errors.Trace(iter.Error())
}

if count > 0 {
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, nil, err
return nil, nil, stats, err
}
}
}

var leaderPeerMetas []*sst.SSTMeta
for i, wStream := range clients {
if resp, closeErr := wStream.CloseAndRecv(); closeErr != nil {
return nil, nil, closeErr
return nil, nil, stats, closeErr
} else {
if leaderID == region.Region.Peers[i].GetId() {
leaderPeerMetas = resp.Metas
Expand All @@ -707,28 +715,30 @@ func (local *local) WriteToTiKV(
if leaderPeerMetas == nil {
log.L().Warn("write to tikv no leader", log.ZapRedactReflect("region", region),
zap.Uint64("leader_id", leaderID), log.ZapRedactReflect("meta", meta),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, nil, stats, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
region.Region.Id, leaderID)
}

log.L().Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID),
zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("buf_size", bytesBuf.totalSize()),
zap.Stringer("takeTime", time.Since(begin)))

var remainRange *Range
if iter.Valid() && iter.Next() {
firstKey := append([]byte{}, iter.Key()...)
remainRange = &Range{start: firstKey, end: regionRange.end}
log.L().Info("write to tikv partial finish", zap.Int("count", totalCount),
log.L().Info("write to tikv partial finish", zap.Int64("count", totalCount),
zap.Int64("size", size), log.ZapRedactBinary("startKey", regionRange.start), log.ZapRedactBinary("endKey", regionRange.end),
log.ZapRedactBinary("remainStart", remainRange.start), log.ZapRedactBinary("remainEnd", remainRange.end),
log.ZapRedactReflect("region", region))
}
stats.count = totalCount
stats.totalBytes = size

return leaderPeerMetas, remainRange, nil
return leaderPeerMetas, remainRange, stats, nil
}

func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split.RegionInfo) (*sst.IngestResponse, error) {
Expand Down Expand Up @@ -1031,10 +1041,11 @@ func (local *local) writeAndIngestPairs(
) (*Range, error) {
var err error
var remainRange *Range
var rangeStats rangeStats
loopWrite:
for i := 0; i < maxRetryTimes; i++ {
var metas []*sst.SSTMeta
metas, remainRange, err = local.WriteToTiKV(ctx, engineFile, region, start, end)
metas, remainRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end)
if err != nil {
log.L().Warn("write to tikv failed", log.ShortError(err))
return nil, err
Expand Down Expand Up @@ -1110,6 +1121,8 @@ loopWrite:
log.L().Warn("write and ingest region, will retry import full range", log.ShortError(err),
log.ZapRedactStringer("region", region.Region), log.ZapRedactBinary("start", start),
log.ZapRedactBinary("end", end))
} else {
metric.BytesCounter.WithLabelValues(metric.TableStateImported).Add(float64(rangeStats.totalBytes))
}
return remainRange, errors.Trace(err)
}
Expand Down
7 changes: 7 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ var (
Name: "chunks",
Help: "count number of chunks processed",
}, []string{"state"})
BytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "lightning",
Name: "bytes",
Help: "count of total bytes",
}, []string{"state"})
// state can be one of:
// - estimated (an estimation derived from the file size)
// - pending
Expand Down Expand Up @@ -190,6 +196,7 @@ func init() {
prometheus.MustRegister(TableCounter)
prometheus.MustRegister(ProcessedEngineCounter)
prometheus.MustRegister(ChunkCounter)
prometheus.MustRegister(BytesCounter)
prometheus.MustRegister(ImportSecondsHistogram)
prometheus.MustRegister(RowReadSecondsHistogram)
prometheus.MustRegister(RowReadBytesHistogram)
Expand Down
Loading