Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: make split and restore pipelined. #427

Merged
merged 36 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bc6224c
restore: make split and restore pipelined.
Jul 16, 2020
fb6880c
restore: adapt the unit test.
Jul 16, 2020
781e634
restore: disable concurrent split for now
Jul 16, 2020
d63e437
restore: remove concurrency limit, fixed checksum time summary
Jul 17, 2020
5f1449a
restore: handle a error has been omitted past
Jul 17, 2020
32197cd
restore: abstract sink type
Jul 17, 2020
f24b5ab
restore: change some log level
Jul 17, 2020
e350877
Merge branch 'master' into pipelined-split
YuJuncen Jul 17, 2020
683dbdf
restore: add log when file backed up
Jul 20, 2020
9946b3e
restore: unexport some private fields
Jul 20, 2020
7b34f2b
restore: grow the restore channel size
Jul 20, 2020
b457a96
restore: grow all channel size
Jul 20, 2020
c0fe56c
restore: move some synchronous logic to async, call leave on tables w…
Jul 20, 2020
e4c9073
Merge branch 'master' into pipelined-split
YuJuncen Jul 21, 2020
55fc7f4
Merge branch 'master' into pipelined-split
YuJuncen Jul 22, 2020
6df57bd
restore: fix conflicted merge
Jul 24, 2020
41095e7
Merge branch 'master' into pipelined-split
3pointer Aug 13, 2020
2353785
Merge branch 'master' into pipelined-split
YuJuncen Aug 31, 2020
64c9c88
Merge branch 'master' into pipelined-split
YuJuncen Sep 2, 2020
d9bf89d
*: Merge master
Sep 3, 2020
8de1774
restore: do clean work in a not closed context
Sep 3, 2020
79a249e
restore: fix CI
Sep 3, 2020
d51c923
Merge branch 'master' into pipelined-split
YuJuncen Sep 18, 2020
6a29f7f
restore: remove tiflash-relative code
Sep 18, 2020
8aee5d6
fix CI
Sep 18, 2020
461f40c
restore: fix a malformed printf
Sep 18, 2020
0f7af92
Merge branch 'master' into pipelined-split
YuJuncen Sep 18, 2020
a8f0f26
Merge branch 'master' into pipelined-split
YuJuncen Sep 18, 2020
7308cba
Merge branch 'master' of https://github.com/pingcap/br into pipelined…
Sep 21, 2020
cd40878
Merge branch 'pipelined-split' of https://github.com/YuJuncen/br into…
Sep 21, 2020
038e37d
restore: pass context to RestoreFiles
Sep 21, 2020
b2c01fc
Merge branch 'master' into pipelined-split
YuJuncen Sep 27, 2020
273fca1
Merge branch 'master' into pipelined-split
YuJuncen Sep 27, 2020
c5fe188
Merge branch 'master' into pipelined-split
YuJuncen Oct 9, 2020
1cb094a
restore: reset unexpectedly moved to outside
Oct 9, 2020
baa2c7e
Merge branch 'master' into pipelined-split
3pointer Oct 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 72 additions & 49 deletions pkg/restore/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/backup"

"github.com/pingcap/log"
"go.uber.org/zap"

Expand Down Expand Up @@ -39,8 +41,8 @@ type Batcher struct {
autoCommitJoiner chan<- struct{}
// everythingIsDone is for waiting for worker done: that is, after we send a
// signal to autoCommitJoiner, we must give it enough time to get things done.
// Then, it should notify us by this waitgroup.
// Use waitgroup instead of a trivial channel for further extension.
// Then, it should notify us by this wait group.
// Use wait group instead of a trivial channel for further extension.
everythingIsDone *sync.WaitGroup
// sendErr is for output error information.
sendErr chan<- error
Expand All @@ -60,6 +62,41 @@ func (b *Batcher) Len() int {
return int(atomic.LoadInt32(&b.size))
}

// contextCleaner is the worker goroutine that cleaning the 'context'
// (e.g. make regions leave restore mode).
func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) {
defer func() {
if ctx.Err() != nil {
timeout := 5 * time.Second
log.Info("restore canceled, cleaning in a context with timeout",
zap.Stringer("timeout", timeout))
limitedCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
b.manager.Close(limitedCtx)
} else {
b.manager.Close(ctx)
}
}()
defer b.everythingIsDone.Done()
for {
select {
case <-ctx.Done():
return
case tbls, ok := <-tables:
if !ok {
return
}
if err := b.manager.Leave(ctx, tbls); err != nil {
b.sendErr <- err
return
}
for _, tbl := range tbls {
b.outCh <- tbl
}
}
}
}

// NewBatcher creates a new batcher by a sender and a context manager.
// the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where).
// the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit).
Expand All @@ -71,7 +108,7 @@ func NewBatcher(
manager ContextManager,
errCh chan<- error,
) (*Batcher, <-chan CreatedTable) {
output := make(chan CreatedTable, defaultBatcherOutputChannelSize)
output := make(chan CreatedTable, defaultChannelSize)
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
Expand All @@ -84,8 +121,12 @@ func NewBatcher(
everythingIsDone: new(sync.WaitGroup),
batchSizeThreshold: 1,
}
b.everythingIsDone.Add(1)
b.everythingIsDone.Add(2)
go b.sendWorker(ctx, sendChan)
restoredTables := make(chan []CreatedTable, defaultChannelSize)
go b.contextCleaner(ctx, restoredTables)
sink := chanTableSink{restoredTables, errCh}
sender.PutSink(sink)
return b, output
}

Expand All @@ -105,7 +146,7 @@ func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) {
// DisableAutoCommit blocks the current goroutine until the worker can gracefully stop,
// and then disable auto commit.
func (b *Batcher) DisableAutoCommit() {
b.joinWorker()
b.joinAutoCommitWorker()
b.autoCommitJoiner = nil
}

Expand All @@ -114,9 +155,9 @@ func (b *Batcher) waitUntilSendDone() {
b.everythingIsDone.Wait()
}

// joinWorker blocks the current goroutine until the worker can gracefully stop.
// joinAutoCommitWorker blocks the current goroutine until the worker can gracefully stop.
// return immediately when auto commit disabled.
func (b *Batcher) joinWorker() {
func (b *Batcher) joinAutoCommitWorker() {
if b.autoCommitJoiner != nil {
log.Debug("gracefully stopping worker goroutine")
b.autoCommitJoiner <- struct{}{}
Expand All @@ -126,17 +167,11 @@ func (b *Batcher) joinWorker() {
}

// sendWorker is the 'worker' that send all ranges to TiKV.
// TODO since all operations are asynchronous now, it's possible to remove this worker.
func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) {
sendUntil := func(lessOrEqual int) {
for b.Len() > lessOrEqual {
tbls, err := b.Send(ctx)
if err != nil {
b.sendErr <- err
return
}
for _, t := range tbls {
b.outCh <- t
}
b.Send(ctx)
}
}

Expand All @@ -148,6 +183,7 @@ func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) {
sendUntil(0)
case SendAllThenClose:
sendUntil(0)
b.sender.Close()
b.everythingIsDone.Done()
return
}
Expand Down Expand Up @@ -181,7 +217,8 @@ func (b *Batcher) asyncSend(t SendType) {
}
}

type drainResult struct {
// DrainResult is the collection of some ranges and theirs metadata.
type DrainResult struct {
// TablesToSend are tables that would be send at this batch.
TablesToSend []CreatedTable
// BlankTablesAfterSend are tables that will be full-restored after this batch send.
Expand All @@ -190,8 +227,17 @@ type drainResult struct {
Ranges []rtree.Range
}

func newDrainResult() drainResult {
return drainResult{
// Files returns all files of this drain result.
func (result DrainResult) Files() []*backup.File {
var files = make([]*backup.File, 0, len(result.Ranges)*2)
for _, fs := range result.Ranges {
files = append(files, fs.Files...)
}
return files
}

func newDrainResult() DrainResult {
return DrainResult{
TablesToSend: make([]CreatedTable, 0),
BlankTablesAfterSend: make([]CreatedTable, 0),
RewriteRules: EmptyRewriteRule(),
Expand All @@ -217,7 +263,7 @@ func newDrainResult() drainResult {
// |--|-------|
// |t2|t3 |
// as you can see, all restored ranges would be removed.
func (b *Batcher) drainRanges() drainResult {
func (b *Batcher) drainRanges() DrainResult {
result := newDrainResult()

b.cachedTablesMu.Lock()
Expand Down Expand Up @@ -271,42 +317,20 @@ func (b *Batcher) drainRanges() drainResult {

// Send sends all pending requests in the batcher.
// returns tables sent FULLY in the current batch.
func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) {
func (b *Batcher) Send(ctx context.Context) {
drainResult := b.drainRanges()
tbs := drainResult.TablesToSend
ranges := drainResult.Ranges

log.Info("restore batch start",
append(
ZapRanges(ranges),
ZapTables(tbs),
)...,
ZapRanges(ranges),
ZapTables(tbs),
)

// Leave is called at b.contextCleaner
if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil {
return nil, err
}
defer func() {
if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil {
log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode",
append(
ZapRanges(ranges),
ZapTables(tbs),
zap.Error(err))...,
)
}
if len(drainResult.BlankTablesAfterSend) > 0 {
log.Debug("table fully restored",
ZapTables(drainResult.BlankTablesAfterSend),
zap.Int("ranges", len(ranges)),
)
}
}()

if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil {
return nil, err
b.sendErr <- err
return
}
return drainResult.BlankTablesAfterSend, nil
b.sender.RestoreBatch(drainResult)
}

func (b *Batcher) sendIfFull() {
Expand Down Expand Up @@ -342,7 +366,6 @@ func (b *Batcher) Close() {
b.waitUntilSendDone()
close(b.outCh)
close(b.sendCh)
b.sender.Close()
}

// SetThreshold sets the threshold that how big the batch size reaching need to send batch.
Expand Down
55 changes: 33 additions & 22 deletions pkg/restore/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,34 @@ type drySender struct {
rewriteRules *restore.RewriteRules
ranges []rtree.Range
nBatch int

sink restore.TableSink
}

func (d *drySender) RestoreBatch(
_ctx context.Context,
ranges []rtree.Range,
rewriteRules *restore.RewriteRules,
) error {
d.mu.Lock()
defer d.mu.Unlock()
log.Info("fake restore range", restore.ZapRanges(ranges)...)
d.nBatch++
d.rewriteRules.Append(*rewriteRules)
d.ranges = append(d.ranges, ranges...)
return nil
func (sender *drySender) PutSink(sink restore.TableSink) {
sender.sink = sink
}

func (sender *drySender) RestoreBatch(ranges restore.DrainResult) {
sender.mu.Lock()
defer sender.mu.Unlock()
log.Info("fake restore range", restore.ZapRanges(ranges.Ranges))
sender.nBatch++
sender.rewriteRules.Append(*ranges.RewriteRules)
sender.ranges = append(sender.ranges, ranges.Ranges...)
sender.sink.EmitTables(ranges.BlankTablesAfterSend...)
}

func (d *drySender) Close() {}
func (sender *drySender) Close() {
sender.sink.Close()
}

func waitForSend() {
time.Sleep(10 * time.Millisecond)
}

func (d *drySender) Ranges() []rtree.Range {
return d.ranges
func (sender *drySender) Ranges() []rtree.Range {
return sender.ranges
}

func newDrySender() *drySender {
Expand All @@ -66,6 +70,13 @@ func newDrySender() *drySender {

type recordCurrentTableManager map[int64]bool

func (manager recordCurrentTableManager) Close(ctx context.Context) {
if len(manager) > 0 {
log.Panic("When closing, there are still some tables doesn't be sent",
zap.Any("tables", manager))
}
}

func newMockManager() recordCurrentTableManager {
return make(recordCurrentTableManager)
}
Expand All @@ -84,7 +95,7 @@ func (manager recordCurrentTableManager) Leave(_ context.Context, tables []resto
return errors.Errorf("Table %d is removed before added", t.Table.ID)
}
log.Info("leaving", zap.Int64("table ID", t.Table.ID))
manager[t.Table.ID] = false
delete(manager, t.Table.ID)
}
return nil
}
Expand All @@ -109,21 +120,21 @@ func (manager recordCurrentTableManager) Has(tables ...restore.TableWithRange) b
return true
}

func (d *drySender) HasRewriteRuleOfKey(prefix string) bool {
for _, rule := range d.rewriteRules.Table {
func (sender *drySender) HasRewriteRuleOfKey(prefix string) bool {
for _, rule := range sender.rewriteRules.Table {
if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) {
return true
}
}
return false
}

func (d *drySender) RangeLen() int {
return len(d.ranges)
func (sender *drySender) RangeLen() int {
return len(sender.ranges)
}

func (d *drySender) BatchCount() int {
return d.nBatch
func (sender *drySender) BatchCount() int {
return sender.nBatch
}

var (
Expand Down
16 changes: 11 additions & 5 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ type Client struct {
// TODO Remove this field or replace it with a []*DB,
// since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution.
// And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition.
// Which is dirty: why we need DBs from different sources?
// This is dirty: why we need DBs from different sources?
// By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`,
// along with them in some private functions.
// Before you do it, you can firstly read discussions at
// https://github.com/pingcap/br/pull/377#discussion_r446594501,
// this probably isn't as easy and it seems like (however, not hard, too :D)
// this probably isn't as easy as it seems like (however, not hard, too :D)
db *DB
rateLimit uint64
isOnline bool
Expand Down Expand Up @@ -511,8 +511,9 @@ func (rc *Client) RestoreFiles(
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore Files",
zap.Int("files", len(files)), zap.Duration("take", elapsed))
log.Info("Restore files",
zap.Duration("take", elapsed),
utils.ZapFiles(files))
summary.CollectSuccessUnit("files", len(files), elapsed)
}
}()
Expand All @@ -530,7 +531,12 @@ func (rc *Client) RestoreFiles(
fileReplica := file
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
fileStart := time.Now()
defer func() {
log.Info("import file done", utils.ZapFile(fileReplica),
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.Import(ectx, fileReplica, rewriteRules)
})
}
Expand Down
Loading