diff --git a/Makefile b/Makefile index 7cb5c95fb..389f2567f 100644 --- a/Makefile +++ b/Makefile @@ -123,7 +123,9 @@ lint: tools tidy: prepare @echo "go mod tidy" GO111MODULE=on go mod tidy - git diff --quiet go.mod go.sum + # tidy isn't a read-only task for go.mod, run FINISH_MOD always, + # so our go.mod1 won't stick in old state + git diff --quiet go.mod go.sum || ("$(FINISH_MOD)" && exit 1) $(FINISH_MOD) failpoint-enable: tools diff --git a/main.go b/main.go index 21e27c105..7c6221ace 100644 --- a/main.go +++ b/main.go @@ -30,14 +30,12 @@ func main() { sig := <-sc fmt.Printf("\nGot signal [%v] to exit.\n", sig) log.Warn("received signal to exit", zap.Stringer("signal", sig)) - switch sig { - case syscall.SIGTERM: - cancel() - os.Exit(0) - default: - cancel() - os.Exit(1) - } + cancel() + fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit") + <-sc + // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, + // hence returning fail exit code. + os.Exit(1) }() rootCmd := &cobra.Command{ diff --git a/pkg/backup/client.go b/pkg/backup/client.go index f95ba7583..4419aacfb 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb-tools/pkg/table-filter" + filter "github.com/pingcap/tidb-tools/pkg/table-filter" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -406,8 +406,6 @@ func (bc *Client) BackupRanges( updateCh glue.Progress, ) ([]*kvproto.File, error) { errCh := make(chan error) - ctx, cancel := context.WithCancel(ctx) - defer cancel() // we collect all files in a single goroutine to avoid thread safety issues. filesCh := make(chan []*kvproto.File, concurrency) @@ -483,8 +481,6 @@ func (bc *Client) BackupRange( zap.Stringer("EndKey", utils.WrapKey(endKey)), zap.Uint64("RateLimit", req.RateLimit), zap.Uint32("Concurrency", req.Concurrency)) - ctx, cancel := context.WithCancel(ctx) - defer cancel() var allStores []*metapb.Store allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash) @@ -497,10 +493,10 @@ func (bc *Client) BackupRange( req.EndKey = endKey req.StorageBackend = bc.backend - push := newPushDown(ctx, bc.mgr, len(allStores)) + push := newPushDown(bc.mgr, len(allStores)) var results rtree.RangeTree - results, err = push.pushBackup(req, allStores, updateCh) + results, err = push.pushBackup(ctx, req, allStores, updateCh) if err != nil { return nil, err } @@ -801,8 +797,6 @@ func SendBackup( respFn func(*kvproto.BackupResponse) error, resetFn func() (kvproto.BackupClient, error), ) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() var errReset error backupLoop: for retry := 0; retry < backupRetryTimes; retry++ { diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 75036ed07..f0784be34 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -19,17 +19,14 @@ import ( // pushDown warps a backup task. type pushDown struct { - ctx context.Context mgr ClientMgr respCh chan *backup.BackupResponse errCh chan error } // newPushDown creates a push down backup. -func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { - log.Info("new backup client") +func newPushDown(mgr ClientMgr, cap int) *pushDown { return &pushDown{ - ctx: ctx, mgr: mgr, respCh: make(chan *backup.BackupResponse, cap), errCh: make(chan error, cap), @@ -38,6 +35,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { // FullBackup make a full backup of a tikv cluster. func (push *pushDown) pushBackup( + ctx context.Context, req backup.BackupRequest, stores []*metapb.Store, updateCh glue.Progress, @@ -51,7 +49,7 @@ func (push *pushDown) pushBackup( log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState())) continue } - client, err := push.mgr.GetBackupClient(push.ctx, storeID) + client, err := push.mgr.GetBackupClient(ctx, storeID) if err != nil { log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) return res, errors.Trace(err) @@ -60,7 +58,7 @@ func (push *pushDown) pushBackup( go func() { defer wg.Done() err := SendBackup( - push.ctx, storeID, client, req, + ctx, storeID, client, req, func(resp *backup.BackupResponse) error { // Forward all responses (including error). push.respCh <- resp @@ -68,7 +66,7 @@ func (push *pushDown) pushBackup( }, func() (backup.BackupClient, error) { log.Warn("reset the connection in push", zap.Uint64("storeID", storeID)) - return push.mgr.ResetBackupClient(push.ctx, storeID) + return push.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { push.errCh <- err diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index f28b2732e..79f2e6393 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -4,11 +4,13 @@ package gluetikv import ( "context" + "sync/atomic" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" + "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" "github.com/pingcap/br/pkg/glue" @@ -48,7 +50,7 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)} + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0} } // Record implements glue.Glue. @@ -57,15 +59,28 @@ func (Glue) Record(name string, val uint64) { } type progress struct { - ch chan<- struct{} + ch chan<- struct{} + closed int32 } // Inc implements glue.Progress. func (p progress) Inc() { + if atomic.LoadInt32(&p.closed) != 0 { + log.Warn("proposing a closed progress") + return + } + // there might be buggy if the thread is yielded here. + // however, there should not be gosched, at most time. + // so send here probably is safe, even not totally safe. + // since adding an extra lock should be costly, we just be optimistic. + // (Maybe a spin lock here would be better?) p.ch <- struct{}{} } // Close implements glue.Progress. func (p progress) Close() { + // set closed to true firstly, + // so we won't see a state that the channel is closed and the p.closed is false. + atomic.StoreInt32(&p.closed, 1) close(p.ch) } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 2b921cd16..6a9d133e6 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -61,6 +61,7 @@ var ( "shuffle-region-scheduler": {}, "shuffle-hot-region-scheduler": {}, } + // TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972 pdRegionMergeCfg = []string{ "max-merge-region-keys", "max-merge-region-size", @@ -73,11 +74,12 @@ var ( // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. DefaultPDCfg = map[string]interface{}{ - "max-merge-region-keys": 200000, - "max-merge-region-size": 20, - "leader-schedule-limit": 4, - "region-schedule-limit": 2048, - "max-snapshot-count": 3, + "max-merge-region-keys": 200000, + "max-merge-region-size": 20, + "leader-schedule-limit": 4, + "region-schedule-limit": 2048, + "max-snapshot-count": 3, + "enable-location-replacement": "true", } ) @@ -175,10 +177,12 @@ func NewPdController( } return &PdController{ - addrs: processedAddrs, - cli: cli, - pdClient: pdClient, - schedulerPauseCh: make(chan struct{}), + addrs: processedAddrs, + cli: cli, + pdClient: pdClient, + // We should make a buffered channel here otherwise when context canceled, + // gracefully shutdown will stick at resuming schedulers. + schedulerPauseCh: make(chan struct{}, 1), }, nil } @@ -408,6 +412,7 @@ func (p *PdController) UpdatePDScheduleConfig( if e == nil { return nil } + log.Warn("failed to update PD config, will try next", zap.Error(e), zap.String("pd", addr)) } return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config") } @@ -416,6 +421,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } + log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) mergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { value := clusterCfg.scheduleCfg[cfgKey] @@ -441,6 +447,12 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { return errors.Annotate(err, "fail to update PD schedule config") } + if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok { + log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement)) + if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil { + return err + } + } return nil } @@ -482,6 +494,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun } undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) + log.Debug("saved PD config", zap.Any("config", scheduleCfg)) disableMergeCfg := make(map[string]interface{}) for _, cfgKey := range pdRegionMergeCfg { @@ -512,7 +525,10 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun limit := int(value.(float64)) scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) } - return undo, p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) + if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { + return undo, err + } + return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"}) } // Close close the connection to pd. diff --git a/pkg/restore/batcher.go b/pkg/restore/batcher.go index 1a4c1256d..71f28ca87 100644 --- a/pkg/restore/batcher.go +++ b/pkg/restore/batcher.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" "go.uber.org/zap" @@ -39,8 +41,8 @@ type Batcher struct { autoCommitJoiner chan<- struct{} // everythingIsDone is for waiting for worker done: that is, after we send a // signal to autoCommitJoiner, we must give it enough time to get things done. - // Then, it should notify us by this waitgroup. - // Use waitgroup instead of a trivial channel for further extension. + // Then, it should notify us by this wait group. + // Use wait group instead of a trivial channel for further extension. everythingIsDone *sync.WaitGroup // sendErr is for output error information. sendErr chan<- error @@ -60,6 +62,37 @@ func (b *Batcher) Len() int { return int(atomic.LoadInt32(&b.size)) } +// contextCleaner is the worker goroutine that cleaning the 'context' +// (e.g. make regions leave restore mode). +func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { + defer func() { + if ctx.Err() != nil { + log.Info("restore canceled, cleaning in background context") + b.manager.Close(context.Background()) + } else { + b.manager.Close(ctx) + } + }() + defer b.everythingIsDone.Done() + for { + select { + case <-ctx.Done(): + return + case tbls, ok := <-tables: + if !ok { + return + } + if err := b.manager.Leave(ctx, tbls); err != nil { + b.sendErr <- err + return + } + for _, tbl := range tbls { + b.outCh <- tbl + } + } + } +} + // NewBatcher creates a new batcher by a sender and a context manager. // the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). // the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). @@ -71,7 +104,7 @@ func NewBatcher( manager ContextManager, errCh chan<- error, ) (*Batcher, <-chan CreatedTable) { - output := make(chan CreatedTable, defaultBatcherOutputChannelSize) + output := make(chan CreatedTable, defaultChannelSize) sendChan := make(chan SendType, 2) b := &Batcher{ rewriteRules: EmptyRewriteRule(), @@ -84,8 +117,12 @@ func NewBatcher( everythingIsDone: new(sync.WaitGroup), batchSizeThreshold: 1, } - b.everythingIsDone.Add(1) + b.everythingIsDone.Add(2) go b.sendWorker(ctx, sendChan) + restoredTables := make(chan []CreatedTable, defaultChannelSize) + go b.contextCleaner(ctx, restoredTables) + sink := chanTableSink{restoredTables, errCh} + sender.PutSink(sink) return b, output } @@ -105,7 +142,7 @@ func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) { // DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, // and then disable auto commit. func (b *Batcher) DisableAutoCommit() { - b.joinWorker() + b.joinAutoCommitWorker() b.autoCommitJoiner = nil } @@ -114,9 +151,9 @@ func (b *Batcher) waitUntilSendDone() { b.everythingIsDone.Wait() } -// joinWorker blocks the current goroutine until the worker can gracefully stop. +// joinAutoCommitWorker blocks the current goroutine until the worker can gracefully stop. // return immediately when auto commit disabled. -func (b *Batcher) joinWorker() { +func (b *Batcher) joinAutoCommitWorker() { if b.autoCommitJoiner != nil { log.Debug("gracefully stopping worker goroutine") b.autoCommitJoiner <- struct{}{} @@ -126,17 +163,11 @@ func (b *Batcher) joinWorker() { } // sendWorker is the 'worker' that send all ranges to TiKV. +// TODO since all operations are asynchronous now, it's possible to remove this worker. func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil := func(lessOrEqual int) { for b.Len() > lessOrEqual { - tbls, err := b.Send(ctx) - if err != nil { - b.sendErr <- err - return - } - for _, t := range tbls { - b.outCh <- t - } + b.Send(ctx) } } @@ -148,6 +179,7 @@ func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { sendUntil(0) case SendAllThenClose: sendUntil(0) + b.sender.Close() b.everythingIsDone.Done() return } @@ -181,7 +213,8 @@ func (b *Batcher) asyncSend(t SendType) { } } -type drainResult struct { +// DrainResult is the collection of some ranges and theirs metadata. +type DrainResult struct { // TablesToSend are tables that would be send at this batch. TablesToSend []CreatedTable // BlankTablesAfterSend are tables that will be full-restored after this batch send. @@ -190,8 +223,17 @@ type drainResult struct { Ranges []rtree.Range } -func newDrainResult() drainResult { - return drainResult{ +// Files returns all files of this drain result. +func (result DrainResult) Files() []*backup.File { + var files = make([]*backup.File, 0, len(result.Ranges)*2) + for _, fs := range result.Ranges { + files = append(files, fs.Files...) + } + return files +} + +func newDrainResult() DrainResult { + return DrainResult{ TablesToSend: make([]CreatedTable, 0), BlankTablesAfterSend: make([]CreatedTable, 0), RewriteRules: EmptyRewriteRule(), @@ -217,7 +259,7 @@ func newDrainResult() drainResult { // |--|-------| // |t2|t3 | // as you can see, all restored ranges would be removed. -func (b *Batcher) drainRanges() drainResult { +func (b *Batcher) drainRanges() DrainResult { result := newDrainResult() b.cachedTablesMu.Lock() @@ -271,42 +313,20 @@ func (b *Batcher) drainRanges() drainResult { // Send sends all pending requests in the batcher. // returns tables sent FULLY in the current batch. -func (b *Batcher) Send(ctx context.Context) ([]CreatedTable, error) { +func (b *Batcher) Send(ctx context.Context) { drainResult := b.drainRanges() tbs := drainResult.TablesToSend ranges := drainResult.Ranges - log.Info("restore batch start", - append( - ZapRanges(ranges), - ZapTables(tbs), - )..., + ZapRanges(ranges), + ZapTables(tbs), ) - + // Leave is called at b.contextCleaner if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { - return nil, err - } - defer func() { - if err := b.manager.Leave(ctx, drainResult.BlankTablesAfterSend); err != nil { - log.Error("encountering error when leaving recover mode, we can go on but some regions may stick on restore mode", - append( - ZapRanges(ranges), - ZapTables(tbs), - zap.Error(err))..., - ) - } - if len(drainResult.BlankTablesAfterSend) > 0 { - log.Debug("table fully restored", - ZapTables(drainResult.BlankTablesAfterSend), - zap.Int("ranges", len(ranges)), - ) - } - }() - - if err := b.sender.RestoreBatch(ctx, ranges, drainResult.RewriteRules); err != nil { - return nil, err + b.sendErr <- err + return } - return drainResult.BlankTablesAfterSend, nil + b.sender.RestoreBatch(drainResult) } func (b *Batcher) sendIfFull() { @@ -342,7 +362,6 @@ func (b *Batcher) Close() { b.waitUntilSendDone() close(b.outCh) close(b.sendCh) - b.sender.Close() } // SetThreshold sets the threshold that how big the batch size reaching need to send batch. diff --git a/pkg/restore/batcher_test.go b/pkg/restore/batcher_test.go index 53a9fbbaa..1f7b4816b 100644 --- a/pkg/restore/batcher_test.go +++ b/pkg/restore/batcher_test.go @@ -30,30 +30,34 @@ type drySender struct { rewriteRules *restore.RewriteRules ranges []rtree.Range nBatch int + + sink restore.TableSink } -func (d *drySender) RestoreBatch( - _ctx context.Context, - ranges []rtree.Range, - rewriteRules *restore.RewriteRules, -) error { - d.mu.Lock() - defer d.mu.Unlock() - log.Info("fake restore range", restore.ZapRanges(ranges)...) - d.nBatch++ - d.rewriteRules.Append(*rewriteRules) - d.ranges = append(d.ranges, ranges...) - return nil +func (sender *drySender) PutSink(sink restore.TableSink) { + sender.sink = sink +} + +func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { + sender.mu.Lock() + defer sender.mu.Unlock() + log.Info("fake restore range", restore.ZapRanges(ranges.Ranges)) + sender.nBatch++ + sender.rewriteRules.Append(*ranges.RewriteRules) + sender.ranges = append(sender.ranges, ranges.Ranges...) + sender.sink.EmitTables(ranges.BlankTablesAfterSend...) } -func (d *drySender) Close() {} +func (sender *drySender) Close() { + sender.sink.Close() +} func waitForSend() { time.Sleep(10 * time.Millisecond) } -func (d *drySender) Ranges() []rtree.Range { - return d.ranges +func (sender *drySender) Ranges() []rtree.Range { + return sender.ranges } func newDrySender() *drySender { @@ -66,6 +70,13 @@ func newDrySender() *drySender { type recordCurrentTableManager map[int64]bool +func (manager recordCurrentTableManager) Close(ctx context.Context) { + if len(manager) > 0 { + log.Panic("When closing, there are still some tables doesn't be sent", + zap.Any("tables", manager)) + } +} + func newMockManager() recordCurrentTableManager { return make(recordCurrentTableManager) } @@ -84,7 +95,7 @@ func (manager recordCurrentTableManager) Leave(_ context.Context, tables []resto return errors.Errorf("Table %d is removed before added", t.Table.ID) } log.Info("leaving", zap.Int64("table ID", t.Table.ID)) - manager[t.Table.ID] = false + delete(manager, t.Table.ID) } return nil } @@ -109,8 +120,8 @@ func (manager recordCurrentTableManager) Has(tables ...restore.TableWithRange) b return true } -func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { - for _, rule := range d.rewriteRules.Table { +func (sender *drySender) HasRewriteRuleOfKey(prefix string) bool { + for _, rule := range sender.rewriteRules.Table { if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) { return true } @@ -118,12 +129,12 @@ func (d *drySender) HasRewriteRuleOfKey(prefix string) bool { return false } -func (d *drySender) RangeLen() int { - return len(d.ranges) +func (sender *drySender) RangeLen() int { + return len(sender.ranges) } -func (d *drySender) BatchCount() int { - return d.nBatch +func (sender *drySender) BatchCount() int { + return sender.nBatch } var ( diff --git a/pkg/restore/client.go b/pkg/restore/client.go index c1f249391..f866493ab 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -13,7 +13,6 @@ import ( "strconv" "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -27,7 +26,6 @@ import ( "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" "github.com/tikv/pd/server/schedule/placement" - "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -50,9 +48,6 @@ const defaultChecksumConcurrency = 64 // Client sends requests to restore files. type Client struct { - ctx context.Context - cancel context.CancelFunc - pdClient pd.Client toolClient SplitClient fileImporter FileImporter @@ -65,27 +60,20 @@ type Client struct { // TODO Remove this field or replace it with a []*DB, // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. - // Which is dirty: why we need DBs from different sources? + // This is dirty: why we need DBs from different sources? // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, // along with them in some private functions. // Before you do it, you can firstly read discussions at // https://github.com/pingcap/br/pull/377#discussion_r446594501, - // this probably isn't as easy and it seems like (however, not hard, too :D) + // this probably isn't as easy as it seems like (however, not hard, too :D) db *DB rateLimit uint64 isOnline bool noSchema bool hasSpeedLimited bool - // Those fields should be removed after we have FULLY supportted TiFlash. - // we place this field here to make a 'good' memory align, but mainly make golang-ci happy :) - tiFlashRecordUpdated bool restoreStores []uint64 - // tables that has TiFlash and those TiFlash have been removed, should be written to disk. - // Those fields should be removed after we have FULLY supportted TiFlash. - tablesRemovedTiFlash []*backup.Schema - storage storage.ExternalStorage backend *backup.StorageBackend switchModeInterval time.Duration @@ -94,22 +82,17 @@ type Client struct { // NewRestoreClient returns a new RestoreClient. func NewRestoreClient( - ctx context.Context, g glue.Glue, pdClient pd.Client, store kv.Storage, tlsConf *tls.Config, ) (*Client, error) { - ctx, cancel := context.WithCancel(ctx) db, err := NewDB(g, store) if err != nil { - cancel() return nil, errors.Trace(err) } return &Client{ - ctx: ctx, - cancel: cancel, pdClient: pdClient, toolClient: NewSplitClient(pdClient, tlsConf), db: db, @@ -155,7 +138,6 @@ func (rc *Client) Close() { if rc.db != nil { rc.db.Close() } - rc.cancel() log.Info("Restore client closed") } @@ -268,11 +250,11 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) { } // ResetTS resets the timestamp of PD to a bigger value. -func (rc *Client) ResetTS(pdAddrs []string) error { +func (rc *Client) ResetTS(ctx context.Context, pdAddrs []string) error { restoreTS := rc.backupMeta.GetEndVersion() log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) i := 0 - return utils.WithRetry(rc.ctx, func() error { + return utils.WithRetry(ctx, func() error { idx := i % len(pdAddrs) i++ return utils.ResetTS(pdAddrs[idx], restoreTS, rc.tlsConf) @@ -280,10 +262,10 @@ func (rc *Client) ResetTS(pdAddrs []string) error { } // GetPlacementRules return the current placement rules. -func (rc *Client) GetPlacementRules(pdAddrs []string) ([]placement.Rule, error) { +func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]placement.Rule, error) { var placementRules []placement.Rule i := 0 - errRetry := utils.WithRetry(rc.ctx, func() error { + errRetry := utils.WithRetry(ctx, func() error { var err error idx := i % len(pdAddrs) i++ @@ -327,12 +309,12 @@ func (rc *Client) GetTableSchema( } // CreateDatabase creates a database. -func (rc *Client) CreateDatabase(db *model.DBInfo) error { +func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error { if rc.IsSkipCreateSQL() { log.Info("skip create database", zap.Stringer("database", db.Name)) return nil } - return rc.db.CreateDatabase(rc.ctx, db) + return rc.db.CreateDatabase(ctx, db) } // CreateTables creates multiple tables, and returns their rewrite rules. @@ -481,116 +463,15 @@ func (rc *Client) createTablesWithDBPool(ctx context.Context, return eg.Wait() } -// makeTiFlashOfTableRecord make a 'record' repsenting TiFlash of a table that has been removed. -// We doesn't record table ID here because when restore TiFlash replicas, -// we use `ALTER TABLE db.tbl SET TIFLASH_REPLICA = xxx` DDL, instead of use some internal TiDB API. -func makeTiFlashOfTableRecord(table *utils.Table, replica int) (*backup.Schema, error) { - tableData, err := json.Marshal(table.Info) - if err != nil { - return nil, errors.Trace(err) - } - dbData, err := json.Marshal(table.Db) - if err != nil { - return nil, errors.Trace(err) - } - result := &backup.Schema{ - Db: dbData, - Table: tableData, - Crc64Xor: table.Crc64Xor, - TotalKvs: table.TotalKvs, - TotalBytes: table.TotalBytes, - TiflashReplicas: uint32(replica), - } - return result, nil -} - -// RemoveTiFlashOfTable removes TiFlash replica of some table, -// returns the removed count of TiFlash nodes. -// TODO: save the removed TiFlash information into disk. -// TODO: remove this after tiflash supports restore. -func (rc *Client) RemoveTiFlashOfTable(table CreatedTable, rule []placement.Rule) (int, error) { - if rule := utils.SearchPlacementRule(table.Table.ID, rule, placement.Learner); rule != nil { - if rule.Count > 0 { - log.Info("remove TiFlash of table", zap.Int64("table ID", table.Table.ID), zap.Int("count", rule.Count)) - err := multierr.Combine( - rc.db.AlterTiflashReplica(rc.ctx, table.OldTable, 0), - rc.removeTiFlashOf(table.OldTable, rule.Count), - rc.flushTiFlashRecord(), - ) - if err != nil { - return 0, errors.Trace(err) - } - return rule.Count, nil - } - } - return 0, nil -} - -func (rc *Client) removeTiFlashOf(table *utils.Table, replica int) error { - tableRecord, err := makeTiFlashOfTableRecord(table, replica) - if err != nil { - return err - } - rc.tablesRemovedTiFlash = append(rc.tablesRemovedTiFlash, tableRecord) - rc.tiFlashRecordUpdated = true - return nil -} - -func (rc *Client) flushTiFlashRecord() error { - // Today nothing to do :D - if !rc.tiFlashRecordUpdated { - return nil - } - - // should we make a deep copy here? - // currently, write things directly to backup meta is OK since there seems nobody uses it. - // But would it be better if we don't do it? - rc.backupMeta.Schemas = rc.tablesRemovedTiFlash - backupMetaData, err := proto.Marshal(rc.backupMeta) - if err != nil { - return errors.Trace(err) - } - backendURL := storage.FormatBackendURL(rc.backend) - log.Info("update backup meta", zap.Stringer("path", &backendURL)) - err = rc.storage.Write(rc.ctx, utils.SavedMetaFile, backupMetaData) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// RecoverTiFlashOfTable recovers TiFlash replica of some table. -// TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashOfTable(table *utils.Table) error { - if table.TiFlashReplicas > 0 { - err := rc.db.AlterTiflashReplica(rc.ctx, table, table.TiFlashReplicas) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// RecoverTiFlashReplica recovers all the tiflash replicas of a table -// TODO: remove this after tiflash supports restore. -func (rc *Client) RecoverTiFlashReplica(tables []*utils.Table) error { - for _, table := range tables { - if err := rc.RecoverTiFlashOfTable(table); err != nil { - return err - } - } - return nil -} - // ExecDDLs executes the queries of the ddl jobs. -func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { +func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error { // Sort the ddl jobs by schema version in ascending order. sort.Slice(ddlJobs, func(i, j int) bool { return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion }) for _, job := range ddlJobs { - err := rc.db.ExecDDL(rc.ctx, job) + err := rc.db.ExecDDL(ctx, job) if err != nil { return errors.Trace(err) } @@ -602,14 +483,14 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { return nil } -func (rc *Client) setSpeedLimit() error { +func (rc *Client) setSpeedLimit(ctx context.Context) error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { - stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.SkipTiFlash) + stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash) if err != nil { return err } for _, store := range stores { - err = rc.fileImporter.setDownloadSpeedLimit(rc.ctx, store.GetId()) + err = rc.fileImporter.setDownloadSpeedLimit(ctx, store.GetId()) if err != nil { return err } @@ -621,6 +502,7 @@ func (rc *Client) setSpeedLimit() error { // RestoreFiles tries to restore the files. func (rc *Client) RestoreFiles( + ctx context.Context, files []*backup.File, rewriteRules *RewriteRules, updateCh glue.Progress, @@ -629,8 +511,9 @@ func (rc *Client) RestoreFiles( defer func() { elapsed := time.Since(start) if err == nil { - log.Info("Restore Files", - zap.Int("files", len(files)), zap.Duration("take", elapsed)) + log.Info("Restore files", + zap.Duration("take", elapsed), + utils.ZapFiles(files)) summary.CollectSuccessUnit("files", len(files), elapsed) } }() @@ -638,8 +521,8 @@ func (rc *Client) RestoreFiles( log.Debug("start to restore files", zap.Int("files", len(files)), ) - eg, ectx := errgroup.WithContext(rc.ctx) - err = rc.setSpeedLimit() + eg, ectx := errgroup.WithContext(ctx) + err = rc.setSpeedLimit(ctx) if err != nil { return err } @@ -648,7 +531,12 @@ func (rc *Client) RestoreFiles( fileReplica := file rc.workerPool.ApplyOnErrorGroup(eg, func() error { - defer updateCh.Inc() + fileStart := time.Now() + defer func() { + log.Info("import file done", utils.ZapFile(fileReplica), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + }() return rc.fileImporter.Import(ectx, fileReplica, rewriteRules) }) } @@ -664,7 +552,9 @@ func (rc *Client) RestoreFiles( } // RestoreRaw tries to restore raw keys in the specified range. -func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error { +func (rc *Client) RestoreRaw( + ctx context.Context, startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress, +) error { start := time.Now() defer func() { elapsed := time.Since(start) @@ -674,7 +564,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil zap.Duration("take", elapsed)) }() errCh := make(chan error, len(files)) - eg, ectx := errgroup.WithContext(rc.ctx) + eg, ectx := errgroup.WithContext(ctx) defer close(errCh) err := rc.fileImporter.SetRawRange(startKey, endKey) diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index a8c37a70b..4a52ba937 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -3,7 +3,6 @@ package restore_test import ( - "context" "math" "strconv" @@ -40,7 +39,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64) @@ -98,7 +97,7 @@ func (s *testRestoreClientSuite) TestIsOnline(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() - client, err := restore.NewRestoreClient(context.Background(), gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) + client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil) c.Assert(err, IsNil) c.Assert(client.IsOnline(), IsFalse) diff --git a/pkg/restore/pipeline_items.go b/pkg/restore/pipeline_items.go index 83242cb2b..7dcc02792 100644 --- a/pkg/restore/pipeline_items.go +++ b/pkg/restore/pipeline_items.go @@ -4,9 +4,9 @@ package restore import ( "context" + "sync" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" "go.uber.org/zap" @@ -17,9 +17,34 @@ import ( ) const ( - defaultBatcherOutputChannelSize = 1024 + defaultChannelSize = 1024 ) +// TableSink is the 'sink' of restored data by a sender. +type TableSink interface { + EmitTables(tables ...CreatedTable) + EmitError(error) + Close() +} + +type chanTableSink struct { + outCh chan<- []CreatedTable + errCh chan<- error +} + +func (sink chanTableSink) EmitTables(tables ...CreatedTable) { + sink.outCh <- tables +} + +func (sink chanTableSink) EmitError(err error) { + sink.errCh <- err +} + +func (sink chanTableSink) Close() { + // ErrCh may has multi sender part, don't close it. + close(sink.outCh) +} + // ContextManager is the struct to manage a TiKV 'context' for restore. // Batcher will call Enter when any table should be restore on batch, // so you can do some prepare work here(e.g. set placement rules for online restore). @@ -28,6 +53,9 @@ type ContextManager interface { Enter(ctx context.Context, tables []CreatedTable) error // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). Leave(ctx context.Context, tables []CreatedTable) error + // Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup + // it would be call. + Close(ctx context.Context) } // NewBRContextManager makes a BR context manager, that is, @@ -37,7 +65,7 @@ func NewBRContextManager(client *Client) ContextManager { return &brContextManager{ client: client, - hasTable: make(map[int64]bool), + hasTable: make(map[int64]CreatedTable), } } @@ -45,17 +73,25 @@ type brContextManager struct { client *Client // This 'set' of table ID allow us handle each table just once. - hasTable map[int64]bool + hasTable map[int64]CreatedTable +} + +func (manager *brContextManager) Close(ctx context.Context) { + tbls := make([]*model.TableInfo, 0, len(manager.hasTable)) + for _, tbl := range manager.hasTable { + tbls = append(tbls, tbl.Table) + } + splitPostWork(ctx, manager.client, tbls) } func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { placementRuleTables := make([]*model.TableInfo, 0, len(tables)) for _, tbl := range tables { - if !manager.hasTable[tbl.Table.ID] { + if _, ok := manager.hasTable[tbl.Table.ID]; !ok { placementRuleTables = append(placementRuleTables, tbl.Table) } - manager.hasTable[tbl.Table.ID] = true + manager.hasTable[tbl.Table.ID] = tbl } return splitPrepareWork(ctx, manager.client, placementRuleTables) @@ -70,6 +106,9 @@ func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTabl splitPostWork(ctx, manager.client, placementRuleTables) log.Info("restore table done", ZapTables(tables)) + for _, tbl := range placementRuleTables { + delete(manager.hasTable, tbl.ID) + } return nil } @@ -128,14 +167,32 @@ func Exhaust(ec <-chan error) []error { // BatchSender is the abstract of how the batcher send a batch. type BatchSender interface { + // PutSink sets the sink of this sender, user to this interface promise + // call this function at least once before first call to `RestoreBatch`. + PutSink(sink TableSink) // RestoreBatch will send the restore request. - RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error + RestoreBatch(ranges DrainResult) Close() } type tikvSender struct { client *Client updateCh glue.Progress + + sink TableSink + inCh chan<- DrainResult + + wg *sync.WaitGroup +} + +func (b *tikvSender) PutSink(sink TableSink) { + // don't worry about visibility, since we will call this before first call to + // RestoreBatch, which is a sync point. + b.sink = sink +} + +func (b *tikvSender) RestoreBatch(ranges DrainResult) { + b.inCh <- ranges } // NewTiKVSender make a sender that send restore requests to TiKV. @@ -144,40 +201,80 @@ func NewTiKVSender( cli *Client, updateCh glue.Progress, ) (BatchSender, error) { - return &tikvSender{ + inCh := make(chan DrainResult, defaultChannelSize) + midCh := make(chan DrainResult, defaultChannelSize) + + sender := &tikvSender{ client: cli, updateCh: updateCh, - }, nil -} - -func (b *tikvSender) RestoreBatch(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules) error { - if err := SplitRanges(ctx, b.client, ranges, rewriteRules, b.updateCh); err != nil { - log.Error("failed on split range", - zap.Any("ranges", ranges), - zap.Error(err), - ) - return err + inCh: inCh, + wg: new(sync.WaitGroup), } - files := []*backup.File{} - for _, fs := range ranges { - files = append(files, fs.Files...) - } + sender.wg.Add(2) + go sender.splitWorker(ctx, inCh, midCh) + go sender.restoreWorker(ctx, midCh) + return sender, nil +} - if err := b.client.RestoreFiles(files, rewriteRules, b.updateCh); err != nil { - return err +func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) { + defer log.Debug("split worker closed") + defer func() { + b.wg.Done() + close(next) + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil { + log.Error("failed on split range", + zap.Any("ranges", ranges), + zap.Error(err), + ) + b.sink.EmitError(err) + return + } + next <- result + } } +} - log.Info("restore batch done", - append( - ZapRanges(ranges), - zap.Int("file count", len(files)), - )..., - ) - - return nil +func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) { + defer func() { + log.Debug("restore worker closed") + b.wg.Done() + b.sink.Close() + }() + for { + select { + case <-ctx.Done(): + return + case result, ok := <-ranges: + if !ok { + return + } + files := result.Files() + if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil { + b.sink.EmitError(err) + return + } + + log.Info("restore batch done", + ZapRanges(result.Ranges), + zap.Int("file count", len(files)), + ) + b.sink.EmitTables(result.BlankTablesAfterSend...) + } + } } func (b *tikvSender) Close() { - // don't close update channel here, since we may need it then. + close(b.inCh) + b.wg.Wait() + log.Debug("tikv sender closed") } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index b960f5bb8..3c84b0054 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -503,29 +503,42 @@ func PaginateScanRegion( // ZapTables make zap field of table for debuging, including table names. func ZapTables(tables []CreatedTable) zapcore.Field { - tableNames := make([]string, 0, len(tables)) - for _, t := range tables { - tableNames = append(tableNames, fmt.Sprintf("%s.%s", t.OldTable.Db.Name, t.OldTable.Info.Name)) - } - return zap.Strings("tables", tableNames) + return zap.Array("tables", tableSliceArrayMixIn(tables)) } // ZapRanges make zap fields for debuging, which contains kv, size and count of ranges. -func ZapRanges(ranges []rtree.Range) []zapcore.Field { +// TODO make it a lazy zap object. +func ZapRanges(ranges []rtree.Range) zapcore.Field { + return zap.Object("rangeInfo", rangesSliceObjectMixin(ranges)) +} + +type tableSliceArrayMixIn []CreatedTable + +func (ss tableSliceArrayMixIn) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, s := range ss { + encoder.AppendString(fmt.Sprintf("%s.%s", + utils.EncloseName(s.OldTable.Db.Name.String()), + utils.EncloseName(s.OldTable.Info.Name.String()))) + } + return nil +} + +type rangesSliceObjectMixin []rtree.Range + +func (rs rangesSliceObjectMixin) MarshalLogObject(encoder zapcore.ObjectEncoder) error { totalKV := uint64(0) totalSize := uint64(0) - for _, r := range ranges { + for _, r := range rs { for _, f := range r.Files { totalKV += f.GetTotalKvs() totalSize += f.GetTotalBytes() } } - return []zap.Field{ - zap.Int("ranges", len(ranges)), - zap.Uint64("total kv", totalKV), - zap.Uint64("total size", totalSize), - } + encoder.AddInt("ranges", len(rs)) + encoder.AddUint64("total kv", totalKV) + encoder.AddUint64("total size", totalSize) + return nil } // ParseQuoteName parse the quote `db`.`table` name, and split it. diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 11697187a..47dd7cc2c 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -221,6 +221,10 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Debug("removing some PD schedulers") restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 0047fadca..1fd806a7d 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -147,6 +147,10 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if cfg.RemoveSchedulers { restore, e := mgr.RemoveSchedulers(ctx) defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, try shutdown") + ctx = context.Background() + } if restoreE := restore(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 0075e2ce5..eae7e9244 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -102,7 +102,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() - client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) + client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) if err != nil { return err } @@ -173,7 +173,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer restoreDBConfig() // execute DDL first - err = client.ExecDDLs(ddlJobs) + err = client.ExecDDLs(ctx, ddlJobs) if err != nil { return errors.Trace(err) } @@ -187,7 +187,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } for _, db := range dbs { - err = client.CreateDatabase(db.Info) + err = client.CreateDatabase(ctx, db.Info) if err != nil { return err } @@ -241,7 +241,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. if !client.IsIncremental() { - if err = client.ResetTS(cfg.PD); err != nil { + if err = client.ResetTS(ctx, cfg.PD); err != nil { log.Error("reset pd TS failed", zap.Error(err)) return err } @@ -367,6 +367,10 @@ func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) func restorePostWork( ctx context.Context, client *restore.Client, restoreSchedulers utils.UndoFunc, ) { + if ctx.Err() != nil { + log.Warn("context canceled, try shutdown") + ctx = context.Background() + } if client.IsOnline() { return } diff --git a/pkg/task/restore_log.go b/pkg/task/restore_log.go index e75981244..839a60a7d 100644 --- a/pkg/task/restore_log.go +++ b/pkg/task/restore_log.go @@ -109,7 +109,7 @@ func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error if err != nil { return err } - client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) + client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) if err != nil { return err } diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 4596bb7c7..0aabb01a4 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -68,7 +68,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer mgr.Close() - client, err := restore.NewRestoreClient(ctx, g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) + client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetTiKV(), mgr.GetTLSConfig()) if err != nil { return err } @@ -129,7 +129,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer restorePostWork(ctx, client, restoreSchedulers) - err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) + err = client.RestoreRaw(ctx, cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { return errors.Trace(err) } diff --git a/pkg/utils/logging.go b/pkg/utils/logging.go index aa68c7b1e..18491c63a 100644 --- a/pkg/utils/logging.go +++ b/pkg/utils/logging.go @@ -85,6 +85,21 @@ func (keys zapArrayMarshalKeysMixIn) MarshalLogArray(enc zapcore.ArrayEncoder) e return nil } +type files []*backup.File + +func (fs files) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + totalKVs := uint64(0) + totalSize := uint64(0) + for _, file := range fs { + totalKVs += file.GetTotalKvs() + totalSize += file.GetTotalBytes() + } + encoder.AddUint64("totalKVs", totalKVs) + encoder.AddUint64("totalBytes", totalSize) + encoder.AddInt("totalFileCount", len(fs)) + return nil +} + // WrapKey wrap a key as a Stringer that can print proper upper hex format. func WrapKey(key []byte) fmt.Stringer { return kv.Key(key) @@ -114,3 +129,8 @@ func ZapFile(file *backup.File) zapcore.Field { func ZapSSTMeta(sstMeta *import_sstpb.SSTMeta) zapcore.Field { return zap.Object("sstMeta", zapMarshalSSTMetaMixIn{sstMeta}) } + +// ZapFiles make the zap field for a set of file. +func ZapFiles(fs []*backup.File) zapcore.Field { + return zap.Object("fs", files(fs)) +} diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index e37088884..ed3b26af1 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -17,6 +17,8 @@ set -eu DB="$TEST_NAME" run_sql "CREATE DATABASE $DB;" +trap "run_sql \"DROP DATABASE $DB;\"" EXIT + run_sql "CREATE TABLE $DB.usertable1 ( \ YCSB_KEY varchar(64) NOT NULL, \ @@ -64,7 +66,9 @@ fi echo "backup start to test lock file" PPROF_PORT=6080 GO_FAILPOINTS="github.com/pingcap/br/pkg/utils/determined-pprof-port=return($PPROF_PORT)" \ -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 >/dev/null & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --remove-schedulers --ratelimit 1 --ratelimit-unit 1 --concurrency 4 2>&1 > $TEST_DIR/br-other-stdout.log & +trap "cat $TEST_DIR/br-other-stdout.log" EXIT + # record last backup pid _pid=$! @@ -79,7 +83,7 @@ curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null echo "pprof started..." curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": false' - +curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."enable-location-replacement"' | grep "false" backup_fail=0 echo "another backup start expect to fail due to last backup add a lockfile" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 @@ -90,7 +94,7 @@ fi # check is there still exists scheduler not in pause. pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) -if [ "$pause_schedulers" -ne "3" ];then +if [ "$pause_schedulers" -lt "3" ];then echo "TEST: [$TEST_NAME] failed because paused scheduler are not enough" exit 1 fi @@ -99,36 +103,59 @@ if ps -p $_pid > /dev/null then echo "$_pid is running" # kill last backup progress (Don't send SIGKILL, or we might stuck PD in no scheduler state.) - kill $_pid + pkill -P $_pid + echo "$_pid is killed @ $(date)" else echo "TEST: [$TEST_NAME] test backup lock file failed! the last backup finished" exit 1 fi + # make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it. # give enough time to BR so it can gracefully stop. -sleep 5 +sleep 30 if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true' then echo "TEST: [$TEST_NAME] failed because scheduler has been removed" exit 1 fi -pd_settings=5 + +default_pd_values='{ + "max-merge-region-keys": 200000, + "max-merge-region-size": 20, + "leader-schedule-limit": 4, + "region-schedule-limit": 2048, + "max-snapshot-count": 3 +}' + +for key in $(echo $default_pd_values | jq 'keys[]'); do + if ! curl -s http://$PD_ADDR/pd/api/v1/config/schedule | jq ".[$key]" | grep -q $(echo $default_pd_values | jq ".[$key]"); then + curl -s http://$PD_ADDR/pd/api/v1/config/schedule + echo "[$TEST_NAME] failed due to PD config isn't reset after restore" + exit 1 + fi +done + # check is there still exists scheduler in pause. pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l) -if [ "$pause_schedulers" -ne "3" ];then + # There shouldn't be any paused schedulers since BR gracfully shutdown. + if [ "$pause_schedulers" -ne "0" ];then echo "TEST: [$TEST_NAME] failed because paused scheduler has changed" exit 1 fi +pd_settings=6 + # balance-region scheduler enabled curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-region")}' | grep '"disable": false' || ((pd_settings--)) # balance-leader scheduler enabled curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-leader")}' | grep '"disable": false' || ((pd_settings--)) # hot region scheduler enabled curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="hot-region")}' | grep '"disable": false' || ((pd_settings--)) +# location replacement enabled +curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."enable-location-replacement"' | grep "true" || ((pd_settings--)) # we need reset pd config to default # until pd has the solution to temporary set these scheduler/configs. @@ -140,12 +167,11 @@ curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | # max-merge-region-keys set to default 200000 curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--)) -if [ "$pd_settings" -ne "5" ];then +if [ "$pd_settings" -ne "6" ];then echo "TEST: [$TEST_NAME] test validate reset pd config failed!" exit 1 fi -run_sql "DROP DATABASE $DB;" # Test version run_br --version diff --git a/tests/br_tiflash/run.sh b/tests/br_tiflash/run.sh index dab046c52..d19c55788 100644 --- a/tests/br_tiflash/run.sh +++ b/tests/br_tiflash/run.sh @@ -42,6 +42,7 @@ while ! [ $(run_sql "select * from information_schema.tiflash_replica" | grep "P echo "Waiting for TiFlash synchronizing [$i]." if [ $i -gt 20 ]; then echo "Failed to sync data to tiflash." + exit 1 fi sleep 5 done @@ -52,12 +53,14 @@ run_br backup full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR run_sql "DROP DATABASE $DB" run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +# wating for TiFlash sync +sleep 90 AFTER_BR_COUNT=`run_sql "SELECT count(*) FROM $DB.kv;" | sed -n "s/[^0-9]//g;/^[0-9]*$/p" | tail -n1` -if [ $AFTER_BR_COUNT -ne $RECORD_COUNT ]; then +if [ "$AFTER_BR_COUNT" -ne "$RECORD_COUNT" ]; then echo "failed to restore, before: $RECORD_COUNT; after: $AFTER_BR_COUNT" exit 1 fi run_sql "DROP DATABASE $DB" -echo "TEST $TEST_NAME passed!" \ No newline at end of file +echo "TEST $TEST_NAME passed!"