diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 1a4c1256d..4a7431ce4 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" "go.uber.org/zap" @@ -39,8 +41,8 @@ type Batcher struct { autoCommitJoiner chan<- struct{} // everythingIsDone is for waiting for worker done: that is, after we send a // signal to autoCommitJoiner, we must give it enough time to get things done. - // Then, it should notify us by this waitgroup. - // Use waitgroup instead of a trivial channel for further extension. + // Then, it should notify us by this wait group. + // Use wait group instead of a trivial channel for further extension. everythingIsDone *sync.WaitGroup // sendErr is for output error information. sendErr chan<- error @@ -60,6 +62,41 @@ func (b *Batcher) Len() int { return int(atomic.LoadInt32(&b.size)) } +// contextCleaner is the worker goroutine that cleaning the 'context' +// (e.g. make regions leave restore mode). +func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { + defer func() { + if ctx.Err() != nil { + timeout := 5 * time.Second + log.Info("restore canceled, cleaning in a context with timeout", + zap.Stringer("timeout", timeout)) + limitedCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + b.manager.Close(limitedCtx) + } else { + b.manager.Close(ctx) + } + }() + defer b.everythingIsDone.Done() + for { + select { + case <-ctx.Done(): + return + case tbls, ok := <-tables: + if !ok { + return + } + if err := b.manager.Leave(ctx, tbls); err != nil { + b.sendErr <- err + return + } + for _, tbl := range tbls { + b.outCh <- tbl + } + } + } +} + // NewBatcher creates a new batcher by a sender and a context manager. // the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). // the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). @@ -71,7 +108,7 @@ func NewBatcher( manager ContextManager, errCh chan<- error, ) (*Batcher, <-chan CreatedTable) { - output := make(chan CreatedTable, defaultBatcherOutputChannelSize) + output := make(chan CreatedTable, defaultChannelSize) sendChan := make(chan SendType, 2) b := &Batcher{ rewriteRules: EmptyRewriteRule(), @@ -84,8 +121,12 @@ func NewBatcher( everythingIsDone: new(sync.WaitGroup), batchSizeThreshold: 1, } - b.everythingIsDone.Add(1) + b.everythingIsDone.Add(2) go b.sendWorker(ctx, sendChan) + restoredTables := make(chan []CreatedTable, defaultChannelSize) + go b.contextCleaner(ctx, restoredTables) + sink := chanTableSink{restoredTables, errCh} + sender.PutSink(sink) return b, output } @@ -105,7 +146,7 @@ func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) { // DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, // and then disable auto commit. func (b *Batcher) DisableAutoCommit() { - b.joinWorker() + b.joinAutoCommitWorker() b.autoCommitJoiner = nil } @@ -114,9 +155,9 @@ func (b *Batcher) waitUntilSendDone() { b.everythingIsDone.Wait() } -// joinWorker blocks the current goroutine until the worker can gracefully stop. +// joinAutoCommitWorker blocks the current goroutine until the worker can gracefully stop. // return immediately when auto commit disabled. -func (b *Batcher) joinWorker() { +func (b *Batcher) joinAutoCommitWorker() { if b.autoCommitJoiner != nil { log.Debug("gracefully stopping worker goroutine") b.autoCommitJoiner <- struct{}{} @@ -126,17 +167,11 @@ func (b *Batcher) joinWorker() { } // sendWorker is the 'worker' that send all ranges to TiKV. +// TODO since all operations are asynchronous now, it's possible to remove this worker. func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil := func(lessOrEqual int) { for b.Len() > lessOrEqual { - tbls, err := b.Send(ctx) - if err != nil { - b.sendErr <- err - return - } - for _, t := range tbls { - b.outCh <- t - } + b.Send(ctx) } } @@ -148,6 +183,7 @@ func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil(0) case SendAllThenClose: sendUntil(0) + b.sender.Close() b.everythingIsDone.Done() return } @@ -181,7 +217,8 @@ func (b *Batcher) asyncSend(t SendType) { } } -type drainResult struct { +// DrainResult is the collection of some ranges and theirs metadata. +type DrainResult struct { // TablesToSend are tables that would be send at this batch. TablesToSend []CreatedTable // BlankTablesAfterSend are tables that will be full-restored after this batch send. @@ -190,8 +227,17 @@ type drainResult struct { Ranges []rtree.Range } -func newDrainResult() drainResult { - return drainResult{ +// Files returns all files of this drain result. +func (result DrainResult) Files() []*backup.File { + var files = make([]*backup.File, 0, len(result.Ranges)*2) + for _, fs := range result.Ranges { + files = append(files, fs.Files...) + } + return files +} + +func newDrainResult() DrainResult { + return DrainResult{ TablesToSend: make([]CreatedTable, 0), BlankTablesAfterSend: make([]CreatedTable, 0), RewriteRules: EmptyRewriteRule(), @@ -217,7 +263,7 @@ func newDrainResult() drainResult { // |--|-------| // |t2|t3 | // as you can see, all restored ranges would be removed. -func (b *Batcher) drainRanges() drainResult { +func (b *Batcher) drainRanges() DrainResult { result := newDrainResult() b.cachedTablesMu.Lock() @@ -271,42 +317,20 @@ func (b *Batcher) drainRanges() drainResult { // Send sends all pending requests in the batcher. // returns tables sent FULLY in the current batch. -func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) { +func (b *Batcher) Send(ctx context.Context) { drainResult := b.drainRanges() tbs := drainResult.TablesToSend ranges := drainResult.Ranges - log.Info("restore batch start", - append( - ZapRanges(ranges), - ZapTables(tbs), - )..., + ZapRanges(ranges), + ZapTables(tbs), ) - + // Leave is called at b.contextCleaner if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { - return nil, err - } - defer func() { - if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil { - log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode", - append( - ZapRanges(ranges), - ZapTables(tbs), - zap.Error(err))..., - ) - } - if len(drainResult.BlankTablesAfterSend) > 0 { - log.Debug("table fully restored", - ZapTables(drainResult.BlankTablesAfterSend), - zap.Int("ranges", len(ranges)), - ) - } - }() - - if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil { - return nil, err + b.sendErr <- err + return } - return drainResult.BlankTablesAfterSend, nil + b.sender.RestoreBatch(drainResult) } func (b *Batcher) sendIfFull() { @@ -342,7 +366,6 @@ func (b *Batcher) Close() { b.waitUntilSendDone() close(b.outCh) close(b.sendCh) - b.sender.Close() } // SetThreshold sets the threshold that how big the batch size reaching need to send batch. diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index 53a9fbbaa..1f7b4816b 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -30,30 +30,34 @@ type drySender struct { rewriteRules *restore.RewriteRules ranges []rtree.Range nBatch int + + sink restore.TableSink } -func (d *drySender) RestoreBatch( - _ctx context.Context, - ranges []rtree.Range, - rewriteRules *restore.RewriteRules, -) error { - d.mu.Lock() - defer d.mu.Unlock() - log.Info("fake restore range", restore.ZapRanges(ranges)...) - d.nBatch++ - d.rewriteRules.Append(*rewriteRules) - d.ranges = append(d.ranges, ranges...) - return nil +func (sender *drySender) PutSink(sink restore.TableSink) { + sender.sink = sink +} + +func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { + sender.mu.Lock() + defer sender.mu.Unlock() + log.Info("fake restore range", restore.ZapRanges(ranges.Ranges)) + sender.nBatch++ + sender.rewriteRules.Append(*ranges.RewriteRules) + sender.ranges = append(sender.ranges, ranges.Ranges...) + sender.sink.EmitTables(ranges.BlankTablesAfterSend...) } -func (d *drySender) Close() {} +func (sender *drySender) Close() { + sender.sink.Close() +} func waitForSend() { time.Sleep(10 * time.Millisecond) } -func (d *drySender) Ranges() []rtree.Range { - return d.ranges +func (sender *drySender) Ranges() []rtree.Range { + return sender.ranges } func newDrySender() *drySender { @@ -66,6 +70,13 @@ func newDrySender() *drySender { type recordCurrentTableManager map[int64]bool +func (manager recordCurrentTableManager) Close(ctx context.Context) { + if len(manager) > 0 { + log.Panic("When closing, there are still some tables doesn't be sent", + zap.Any("tables", manager)) + } +} + func newMockManager() recordCurrentTableManager { return make(recordCurrentTableManager) } @@ -84,7 +95,7 @@ func (manager recordCurrentTableManager) Leave(_ context.Context, tables []resto return errors.Errorf("Table %d is removed before added", t.Table.ID) } log.Info("leaving", zap.Int64("table ID", t.Table.ID)) - manager[t.Table.ID] = false + delete(manager, t.Table.ID) } return nil } @@ -109,8 +120,8 @@ func (manager recordCurrentTableManager) Has(tables ...restore.TableWithRange) b return true } -func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { - for _, rule := range d.rewriteRules.Table { +func (sender *drySender) HasRewriteRuleOfKey(prefix string) bool { + for _, rule := range sender.rewriteRules.Table { if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) { return true } @@ -118,12 +129,12 @@ func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { return false } -func (d *drySender) RangeLen() int { - return len(d.ranges) +func (sender *drySender) RangeLen() int { + return len(sender.ranges) } -func (d *drySender) BatchCount() int { - return d.nBatch +func (sender *drySender) BatchCount() int { + return sender.nBatch } var ( diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 3918cba68..f866493ab 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -60,12 +60,12 @@ type Client struct { // TODO Remove this field or replace it with a []*DB, // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. - // Which is dirty: why we need DBs from different sources? + // This is dirty: why we need DBs from different sources? // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, // along with them in some private functions. // Before you do it, you can firstly read discussions at // https://github.com/pingcap/br/pull/377#discussion_r446594501, - // this probably isn't as easy and it seems like (however, not hard, too :D) + // this probably isn't as easy as it seems like (however, not hard, too :D) db *DB rateLimit uint64 isOnline bool @@ -511,8 +511,9 @@ func (rc *Client) RestoreFiles( defer func() { elapsed := time.Since(start) if err == nil { - log.Info("Restore Files", - zap.Int("files", len(files)), zap.Duration("take", elapsed)) + log.Info("Restore files", + zap.Duration("take", elapsed), + utils.ZapFiles(files)) summary.CollectSuccessUnit("files", len(files), elapsed) } }() @@ -530,7 +531,12 @@ func (rc *Client) RestoreFiles( fileReplica := file rc.workerPool.ApplyOnErrorGroup(eg, func() error { - defer updateCh.Inc() + fileStart := time.Now() + defer func() { + log.Info("import file done", utils.ZapFile(fileReplica), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + }() return rc.fileImporter.Import(ectx, fileReplica, rewriteRules) }) } diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index aca8c1efd..7dcc02792 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -4,9 +4,9 @@ package restore import ( "context" + "sync" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" "go.uber.org/zap" @@ -17,9 +17,34 @@ import ( ) const ( - defaultBatcherOutputChannelSize = 1024 + defaultChannelSize = 1024 ) +// TableSink is the 'sink' of restored data by a sender. +type TableSink interface { + EmitTables(tables ...CreatedTable) + EmitError(error) + Close() +} + +type chanTableSink struct { + outCh chan<- []CreatedTable + errCh chan<- error +} + +func (sink chanTableSink) EmitTables(tables ...CreatedTable) { + sink.outCh <- tables +} + +func (sink chanTableSink) EmitError(err error) { + sink.errCh <- err +} + +func (sink chanTableSink) Close() { + // ErrCh may has multi sender part, don't close it. + close(sink.outCh) +} + // ContextManager is the struct to manage a TiKV 'context' for restore. // Batcher will call Enter when any table should be restore on batch, // so you can do some prepare work here(e.g. set placement rules for online restore). @@ -28,6 +53,9 @@ type ContextManager interface { Enter(ctx context.Context, tables []CreatedTable) error // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). Leave(ctx context.Context, tables []CreatedTable) error + // Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup + // it would be call. + Close(ctx context.Context) } // NewBRContextManager makes a BR context manager, that is, @@ -37,7 +65,7 @@ func NewBRContextManager(client *Client) ContextManager { return &brContextManager{ client: client, - hasTable: make(map[int64]bool), + hasTable: make(map[int64]CreatedTable), } } @@ -45,17 +73,25 @@ type brContextManager struct { client *Client // This 'set' of table ID allow us handle each table just once. - hasTable map[int64]bool + hasTable map[int64]CreatedTable +} + +func (manager *brContextManager) Close(ctx context.Context) { + tbls := make([]*model.TableInfo, 0, len(manager.hasTable)) + for _, tbl := range manager.hasTable { + tbls = append(tbls, tbl.Table) + } + splitPostWork(ctx, manager.client, tbls) } func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { placementRuleTables := make([]*model.TableInfo, 0, len(tables)) for _, tbl := range tables { - if !manager.hasTable[tbl.Table.ID] { + if _, ok := manager.hasTable[tbl.Table.ID]; !ok { placementRuleTables = append(placementRuleTables, tbl.Table) } - manager.hasTable[tbl.Table.ID] = true + manager.hasTable[tbl.Table.ID] = tbl } return splitPrepareWork(ctx, manager.client, placementRuleTables) @@ -70,6 +106,9 @@ func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTabl splitPostWork(ctx, manager.client, placementRuleTables) log.Info("restore table done", ZapTables(tables)) + for _, tbl := range placementRuleTables { + delete(manager.hasTable, tbl.ID) + } return nil } @@ -128,14 +167,32 @@ func Exhaust(ec <-chan error) []error { // BatchSender is the abstract of how the batcher send a batch. type BatchSender interface { + // PutSink sets the sink of this sender, user to this interface promise + // call this function at least once before first call to `RestoreBatch`. + PutSink(sink TableSink) // RestoreBatch will send the restore request. - RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error + RestoreBatch(ranges DrainResult) Close() } type tikvSender struct { client *Client updateCh glue.Progress + + sink TableSink + inCh chan<- DrainResult + + wg *sync.WaitGroup +} + +func (b *tikvSender) PutSink(sink TableSink) { + // don't worry about visibility, since we will call this before first call to + // RestoreBatch, which is a sync point. + b.sink = sink +} + +func (b *tikvSender) RestoreBatch(ranges DrainResult) { + b.inCh <- ranges } // NewTiKVSender make a sender that send restore requests to TiKV. @@ -144,40 +201,80 @@ func NewTiKVSender( cli *Client, updateCh glue.Progress, ) (BatchSender, error) { - return &tikvSender{ + inCh := make(chan DrainResult, defaultChannelSize) + midCh := make(chan DrainResult, defaultChannelSize) + + sender := &tikvSender{ client: cli, updateCh: updateCh, - }, nil -} - -func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error { - if err := SplitRanges(ctx, b.client, ranges, rewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", - zap.Any("ranges", ranges), - zap.Error(err), - ) - return err + inCh: inCh, + wg: new(sync.WaitGroup), } - files := []*backup.File{} - for _, fs := range ranges { - files = append(files, fs.Files...) - } + sender.wg.Add(2) + go sender.splitWorker(ctx, inCh, midCh) + go sender.restoreWorker(ctx, midCh) + return sender, nil +} - if err := b.client.RestoreFiles(ctx, files, rewriteRules, b.updateCh); err != nil { - return err +func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { + defer log.Debug("split worker closed") + defer func() { + b.wg.Done() + close(next) + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { + log.Error("failed on split range", + zap.Any("ranges", ranges), + zap.Error(err), + ) + b.sink.EmitError(err) + return + } + next <- result + } } +} - log.Info("restore batch done", - append( - ZapRanges(ranges), - zap.Int("file count", len(files)), - )..., - ) - - return nil +func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) { + defer func() { + log.Debug("restore worker closed") + b.wg.Done() + b.sink.Close() + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + files := result.Files() + if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil { + b.sink.EmitError(err) + return + } + + log.Info("restore batch done", + ZapRanges(result.Ranges), + zap.Int("file count", len(files)), + ) + b.sink.EmitTables(result.BlankTablesAfterSend...) + } + } } func (b *tikvSender) Close() { - // don't close update channel here, since we may need it then. + close(b.inCh) + b.wg.Wait() + log.Debug("tikv sender closed") } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index e2c393afe..03e2d240d 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -504,29 +504,42 @@ func PaginateScanRegion( // ZapTables make zap field of table for debuging, including table names. func ZapTables(tables []CreatedTable) zapcore.Field { - tableNames := make([]string, 0, len(tables)) - for _, t := range tables { - tableNames = append(tableNames, fmt.Sprintf("%s.%s", t.OldTable.Db.Name, t.OldTable.Info.Name)) - } - return zap.Strings("tables", tableNames) + return zap.Array("tables", tableSliceArrayMixIn(tables)) } // ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. -func ZapRanges(ranges []rtree.Range) []zapcore.Field { +// TODO make it a lazy zap object. +func ZapRanges(ranges []rtree.Range) zapcore.Field { + return zap.Object("rangeInfo", rangesSliceObjectMixin(ranges)) +} + +type tableSliceArrayMixIn []CreatedTable + +func (ss tableSliceArrayMixIn) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, s := range ss { + encoder.AppendString(fmt.Sprintf("%s.%s", + utils.EncloseName(s.OldTable.Db.Name.String()), + utils.EncloseName(s.OldTable.Info.Name.String()))) + } + return nil +} + +type rangesSliceObjectMixin []rtree.Range + +func (rs rangesSliceObjectMixin) MarshalLogObject(encoder zapcore.ObjectEncoder) error { totalKV := uint64(0) totalSize := uint64(0) - for _, r := range ranges { + for _, r := range rs { for _, f := range r.Files { totalKV += f.GetTotalKvs() totalSize += f.GetTotalBytes() } } - return []zap.Field{ - zap.Int("ranges", len(ranges)), - zap.Uint64("total kv", totalKV), - zap.Uint64("total size", totalSize), - } + encoder.AddInt("ranges", len(rs)) + encoder.AddUint64("total kv", totalKV) + encoder.AddUint64("total size", totalSize) + return nil } // ParseQuoteName parse the quote `db`.`table` name, and split it. diff --git a/pkg/utils/logging.go b/pkg/utils/logging.go index aa68c7b1e..18491c63a 100644 --- a/pkg/utils/logging.go +++ b/pkg/utils/logging.go @@ -85,6 +85,21 @@ func (keys zapArrayMarshalKeysMixIn) MarshalLogArray(enc zapcore.ArrayEncoder) e return nil } +type files []*backup.File + +func (fs files) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + totalKVs := uint64(0) + totalSize := uint64(0) + for _, file := range fs { + totalKVs += file.GetTotalKvs() + totalSize += file.GetTotalBytes() + } + encoder.AddUint64("totalKVs", totalKVs) + encoder.AddUint64("totalBytes", totalSize) + encoder.AddInt("totalFileCount", len(fs)) + return nil +} + // WrapKey wrap a key as a Stringer that can print proper upper hex format. func WrapKey(key []byte) fmt.Stringer { return kv.Key(key) @@ -114,3 +129,8 @@ func ZapFile(file *backup.File) zapcore.Field { func ZapSSTMeta(sstMeta *import_sstpb.SSTMeta) zapcore.Field { return zap.Object("sstMeta", zapMarshalSSTMetaMixIn{sstMeta}) } + +// ZapFiles make the zap field for a set of file. +func ZapFiles(fs []*backup.File) zapcore.Field { + return zap.Object("fs", files(fs)) +}