Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Pipelined restore. #266

Merged
merged 69 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
0b07b64
restore: add pipelined CreateTable.
Apr 26, 2020
061b669
restore: add pipelined ValidateFileRanges.
Apr 26, 2020
d711ded
restore: pipelining restore process.
Apr 27, 2020
f258d99
restore, task: use batching when pipelining.
Apr 27, 2020
cefb696
restore: batcher split by range(instead of table).
Apr 28, 2020
7846324
restore,task: new way to for polling errCh.
Apr 28, 2020
c78a24b
restore, task: pipelining checksum.
Apr 28, 2020
93b5942
restore, task: cancel parallel DDL request.
Apr 29, 2020
f3ec5ee
restore: restore will now send batch periodly.
Apr 29, 2020
bb31ec0
restore: refactor batcher.
May 8, 2020
d54b99c
restore: add tests on batcher.
May 8, 2020
4c4a3d8
restore, task: make linter happy.
May 8, 2020
3cffa5d
*: add dep to multierr.
May 8, 2020
965779a
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
May 8, 2020
59e33b0
task: adjust to new function sig.
May 8, 2020
44d52be
task, restore: close updateCh until all task finish.
May 9, 2020
a49e1f0
task, restore: pipelined restore supports parition.
May 9, 2020
84e267c
backup: always wait worker to finish.
May 9, 2020
7fab3c3
backup, task: skip checksum when needed.
May 9, 2020
39d2312
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
May 9, 2020
ac6f5be
*: make linter happy.
May 9, 2020
25a9a3a
restore: move batcher test to restore_test package.
May 11, 2020
7f8251e
Apply suggestions from code review
YuJuncen May 12, 2020
4bf507e
restore, task: remove context on struct types.
May 12, 2020
c45c772
restore: batcher auto commit can be disabled now.
May 12, 2020
afea125
restore, task: fix typos.
May 12, 2020
3c7530c
recover: fix a bug about removing tiflash.
May 12, 2020
e164c4f
restore: MapTableToFiles issues Error log when key range not match.
May 13, 2020
2af022a
*: merge master.
May 13, 2020
2bc34d9
restore: fix test to match new change of master.
May 13, 2020
cee0fc3
Merge branch 'master' into pipelined-restore
YuJuncen May 13, 2020
acbb2fa
Merge branch 'master' into pipelined-restore
YuJuncen May 14, 2020
b96de5b
Apply suggestions from code review
YuJuncen May 17, 2020
8c9bf99
Merge branch 'master' into pipelined-restore
YuJuncen May 18, 2020
deb8848
restore: merge two progresses.
May 22, 2020
879b553
Merge branch 'pipelined-restore' of https://github.com/YuJuncen/br in…
May 22, 2020
30d22b4
Merge branch 'master' into pipelined-restore
YuJuncen May 22, 2020
4060eec
restore: fix a bug.
May 25, 2020
1027d28
Merge branch 'pipelined-restore' of https://github.com/YuJuncen/br in…
May 25, 2020
c1d2064
restore: extract batcher to another file
May 25, 2020
557e4e9
Merge branch 'master' into pipelined-restore
YuJuncen May 29, 2020
b0dd355
task: don't return imediately when files is empty.
May 29, 2020
7bca7ad
Merge branch 'master' into pipelined-restore
YuJuncen Jun 2, 2020
b53526e
restore,task: do some refactor
Jun 2, 2020
dd6af60
restore: fix a shaming bug... :|
Jun 2, 2020
f758994
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
Jun 5, 2020
3303cef
task,restore: panic on file broken
Jun 5, 2020
a31e44c
restore: record tiflash count to disk when removed
Jun 5, 2020
88c8117
restore,task: simplify some code,
Jun 9, 2020
cc43d9d
task,restore: fix a bug.
Jun 9, 2020
312039d
restore: some factory and fix
Jun 9, 2020
bbdbecd
tests: try to fix CI
Jun 9, 2020
2c269dc
tests: try to fix CI, again
Jun 9, 2020
22cf64c
Merge branch 'master' into pipelined-restore
YuJuncen Jun 9, 2020
4705a55
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
Jun 12, 2020
55d22e4
Apply suggestions from code review
YuJuncen Jun 12, 2020
c94435d
restore: change some log levels
Jun 12, 2020
fe59bc1
restore: merge joiner of sendWorker into messagebox
Jun 12, 2020
4d67e54
restore,task: run RemoveRestoreLabels at restore post work
Jun 12, 2020
dc1d293
task: adapt the remove-tiflash flag
Jun 12, 2020
5ac8cfe
restore,task: fetch new placement rules each time
Jun 12, 2020
4cbbff0
Apply suggestions from code review
YuJuncen Jun 15, 2020
460331f
restore,task: run Leave always, and modify some log level
Jun 15, 2020
b450532
Merge branch 'master' into pipelined-restore
YuJuncen Jun 15, 2020
0ee5223
restore: fix a bug that may cause checksum time incorrect
Jun 15, 2020
7437950
Merge branch 'pipelined-restore' of https://github.com/Yujuncen/br in…
Jun 15, 2020
8f96c30
Merge branch 'master' into pipelined-restore
3pointer Jun 15, 2020
77ab77f
restore: don't Leave if never Enter
Jun 15, 2020
0ecbcd6
Merge branch 'pipelined-restore' of https://github.com/Yujuncen/br in…
Jun 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ coverage.txt
docker/data/
docker/logs/
*.swp
.DS_Store
353 changes: 353 additions & 0 deletions pkg/restore/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
)

// SendType is the 'type' of a send.
// when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled),
// or, we just want to clean overflowing ranges(when just adding a table to batcher).
type SendType int

const (
// SendUntilLessThanBatch will make the batcher send batch until
// its remaining range is less than its batchSizeThreshold.
SendUntilLessThanBatch SendType = iota
// SendAll will make the batcher send all pending ranges.
SendAll
// SendAllThenClose will make the batcher send all pending ranges and then close itself.
SendAllThenClose
)

// Batcher collects ranges to restore and send batching split/ingest request.
type Batcher struct {
cachedTables []TableWithRange
cachedTablesMu *sync.Mutex
rewriteRules *RewriteRules

// autoCommitJoiner is for joining the background batch sender.
autoCommitJoiner chan<- struct{}
// everythingIsDone is for waiting for worker done: that is, after we send a
// signal to autoCommitJoiner, we must give it enough time to get things done.
// Then, it should notify us by this waitgroup.
// Use waitgroup instead of a trivial channel for further extension.
everythingIsDone *sync.WaitGroup
// sendErr is for output error information.
sendErr chan<- error
// sendCh is for communiate with sendWorker.
sendCh chan<- SendType
// outCh is for output the restored table, so it can be sent to do something like checksum.
outCh chan<- CreatedTable

sender BatchSender
manager ContextManager
batchSizeThreshold int
size int32
}

// Len calculate the current size of this batcher.
func (b *Batcher) Len() int {
return int(atomic.LoadInt32(&b.size))
}

// NewBatcher creates a new batcher by a sender and a context manager.
// the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where).
// the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit).
// this batcher will work background, send batches per second, or batch size reaches limit.
// and it will emit full-restored tables to the output channel returned.
func NewBatcher(
ctx context.Context,
sender BatchSender,
manager ContextManager,
errCh chan<- error,
) (*Batcher, <-chan CreatedTable) {
output := make(chan CreatedTable, defaultBatcherOutputChannelSize)
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
sendErr: errCh,
outCh: output,
sender: sender,
manager: manager,
sendCh: sendChan,
cachedTablesMu: new(sync.Mutex),
everythingIsDone: new(sync.WaitGroup),
batchSizeThreshold: 1,
}
b.everythingIsDone.Add(1)
go b.sendWorker(ctx, sendChan)
return b, output
}

// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough.
// we make this function for disable AutoCommit in some case.
func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) {
if b.autoCommitJoiner != nil {
// IMO, making two auto commit goroutine wouldn't be a good idea.
// If desire(e.g. change the peroid of auto commit), please disable auto commit firstly.
log.L().DPanic("enabling auto commit on a batcher that auto commit has been enabled, which isn't allowed")
}
joiner := make(chan struct{})
go b.autoCommitWorker(ctx, joiner, delay)
b.autoCommitJoiner = joiner
}

// DisableAutoCommit blocks the current goroutine until the worker can gracefully stop,
// and then disable auto commit.
func (b *Batcher) DisableAutoCommit() {
b.joinWorker()
b.autoCommitJoiner = nil
}

func (b *Batcher) waitUntilSendDone() {
b.sendCh <- SendAllThenClose
b.everythingIsDone.Wait()
}

// joinWorker blocks the current goroutine until the worker can gracefully stop.
// return immediately when auto commit disabled.
func (b *Batcher) joinWorker() {
if b.autoCommitJoiner != nil {
log.Debug("gracefully stopping worker goroutine")
b.autoCommitJoiner <- struct{}{}
close(b.autoCommitJoiner)
log.Debug("gracefully stopped worker goroutine")
}
}

// sendWorker is the 'worker' that send all ranges to TiKV.
func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) {
sendUntil := func(lessOrEqual int) {
for b.Len() > lessOrEqual {
tbls, err := b.Send(ctx)
if err != nil {
b.sendErr <- err
return
}
for _, t := range tbls {
b.outCh <- t
}
}
}

for sendType := range send {
switch sendType {
case SendUntilLessThanBatch:
sendUntil(b.batchSizeThreshold)
case SendAll:
sendUntil(0)
case SendAllThenClose:
sendUntil(0)
b.everythingIsDone.Done()
return
}
}
}

func (b *Batcher) autoCommitWorker(ctx context.Context, joiner <-chan struct{}, delay time.Duration) {
tick := time.NewTicker(delay)
defer tick.Stop()
for {
select {
case <-joiner:
log.Debug("graceful stop signal received")
return
case <-ctx.Done():
b.sendErr <- ctx.Err()
return
case <-tick.C:
if b.Len() > 0 {
log.Debug("sending batch because time limit exceed", zap.Int("size", b.Len()))
b.asyncSend(SendAll)
}
}
}
}

func (b *Batcher) asyncSend(t SendType) {
// add a check here so we won't replica sending.
if len(b.sendCh) == 0 {
b.sendCh <- t
}
}

type drainResult struct {
// TablesToSend are tables that would be send at this batch.
TablesToSend []CreatedTable
// BlankTablesAfterSend are tables that will be full-restored after this batch send.
BlankTablesAfterSend []CreatedTable
RewriteRules *RewriteRules
Ranges []rtree.Range
}

func newDrainResult() drainResult {
return drainResult{
TablesToSend: make([]CreatedTable, 0),
BlankTablesAfterSend: make([]CreatedTable, 0),
RewriteRules: EmptyRewriteRule(),
Ranges: make([]rtree.Range, 0),
}
}

// drainRanges 'drains' ranges from current tables.
// for example, let a '-' character be a range, assume we have:
// |---|-----|-------|
// |t1 |t2 |t3 |
// after we run drainRanges() with batchSizeThreshold = 6, let '*' be the ranges will be sent this batch :
// |***|***--|-------|
// |t1 |t2 |-------|
//
// drainRanges() will return:
// TablesToSend: [t1, t2] (so we can make them enter restore mode)
// BlankTableAfterSend: [t1] (so we can make them leave restore mode after restoring this batch)
// RewriteRules: rewrite rules for [t1, t2] (so we can restore them)
// Ranges: those stared ranges (so we can restore them)
//
// then, it will leaving the batcher's cachedTables like this:
// |--|-------|
// |t2|t3 |
// as you can see, all restored ranges would be removed.
func (b *Batcher) drainRanges() drainResult {
result := newDrainResult()

b.cachedTablesMu.Lock()
defer b.cachedTablesMu.Unlock()

for offset, thisTable := range b.cachedTables {
thisTableLen := len(thisTable.Range)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
collected := len(result.Ranges)

result.RewriteRules.Append(*thisTable.RewriteRule)
result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable)

// the batch is full, we should stop here!
// we use strictly greater than because when we send a batch at equal, the offset should plus one.
// (because the last table is sent, we should put it in emptyTables), and this will intrduce extra complex.
if thisTableLen+collected > b.batchSizeThreshold {
drainSize := b.batchSizeThreshold - collected
thisTableRanges := thisTable.Range

var drained []rtree.Range
drained, b.cachedTables[offset].Range = thisTableRanges[:drainSize], thisTableRanges[drainSize:]
log.Debug("draining partial table to batch",
zap.Stringer("db", thisTable.OldTable.Db.Name),
zap.Stringer("table", thisTable.Table.Name),
zap.Int("size", thisTableLen),
zap.Int("drained", drainSize),
)
result.Ranges = append(result.Ranges, drained...)
b.cachedTables = b.cachedTables[offset:]
atomic.AddInt32(&b.size, -int32(len(drained)))
return result
}

result.BlankTablesAfterSend = append(result.BlankTablesAfterSend, thisTable.CreatedTable)
// let's 'drain' the ranges of current table. This op must not make the batch full.
result.Ranges = append(result.Ranges, thisTable.Range...)
atomic.AddInt32(&b.size, -int32(len(thisTable.Range)))
// clear the table length.
b.cachedTables[offset].Range = []rtree.Range{}
log.Debug("draining table to batch",
zap.Stringer("db", thisTable.OldTable.Db.Name),
zap.Stringer("table", thisTable.Table.Name),
zap.Int("size", thisTableLen),
)
}

// all tables are drained.
b.cachedTables = []TableWithRange{}
return result
}

// Send sends all pending requests in the batcher.
// returns tables sent FULLY in the current batch.
func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) {
drainResult := b.drainRanges()
tbs := drainResult.TablesToSend
ranges := drainResult.Ranges

log.Info("restore batch start",
append(
ZapRanges(ranges),
ZapTables(tbs),
)...,
)

if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil {
return nil, err
}
defer func() {
if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil {
log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode",
append(
ZapRanges(ranges),
ZapTables(tbs),
zap.Error(err))...,
)
}
Comment on lines +290 to +297
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this defer after Enter() is called

if len(drainResult.BlankTablesAfterSend) > 0 {
log.Debug("table fully restored",
ZapTables(drainResult.BlankTablesAfterSend),
zap.Int("ranges", len(ranges)),
)
}
}()

if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil {
return nil, err
}
return drainResult.BlankTablesAfterSend, nil
}

func (b *Batcher) sendIfFull() {
if b.Len() >= b.batchSizeThreshold {
log.Debug("sending batch because batcher is full", zap.Int("size", b.Len()))
b.asyncSend(SendUntilLessThanBatch)
}
}

// Add adds a task to the Batcher.
func (b *Batcher) Add(tbs TableWithRange) {
b.cachedTablesMu.Lock()
log.Debug("adding table to batch",
zap.Stringer("db", tbs.OldTable.Db.Name),
zap.Stringer("table", tbs.Table.Name),
zap.Int64("old id", tbs.OldTable.Info.ID),
zap.Int64("new id", tbs.Table.ID),
zap.Int("table size", len(tbs.Range)),
zap.Int("batch size", b.Len()),
)
b.cachedTables = append(b.cachedTables, tbs)
b.rewriteRules.Append(*tbs.RewriteRule)
atomic.AddInt32(&b.size, int32(len(tbs.Range)))
b.cachedTablesMu.Unlock()

b.sendIfFull()
}

// Close closes the batcher, sending all pending requests, close updateCh.
func (b *Batcher) Close() {
log.Info("sending batch lastly on close", zap.Int("size", b.Len()))
b.DisableAutoCommit()
b.waitUntilSendDone()
close(b.outCh)
close(b.sendCh)
b.sender.Close()
}

// SetThreshold sets the threshold that how big the batch size reaching need to send batch.
// note this function isn't goroutine safe yet,
// just set threshold before anything starts(e.g. EnableAutoCommit), please.
func (b *Batcher) SetThreshold(newThreshold int) {
b.batchSizeThreshold = newThreshold
}
Loading