From cdfe4e772914590634c59ccef431c1bd47ba390e Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 9 Aug 2021 06:11:13 +0800 Subject: [PATCH] BR: update br keep upstream (#26972) --- pkg/backup/client.go | 64 ++-- pkg/backup/push.go | 34 +- pkg/lightning/backend/kv/sql2kv.go | 14 +- pkg/lightning/backend/local/local.go | 106 ++++--- pkg/lightning/config/config.go | 83 +++-- pkg/lightning/config/config_test.go | 24 +- pkg/lightning/config/const.go | 2 +- pkg/lightning/lightning_test.go | 4 +- pkg/lightning/mydump/csv_parser.go | 2 + pkg/lightning/mydump/csv_parser_test.go | 64 ++-- pkg/lightning/mydump/loader.go | 36 ++- pkg/lightning/mydump/loader_test.go | 100 +++--- pkg/lightning/mydump/parquet_parser.go | 16 + pkg/lightning/mydump/parser.go | 7 +- pkg/lightning/mydump/parser_test.go | 7 +- pkg/lightning/mydump/region.go | 15 +- pkg/lightning/restore/check_info.go | 294 ++++++++++++++---- pkg/lightning/restore/meta_manager.go | 160 ++++++++-- pkg/lightning/restore/restore.go | 151 +++++---- pkg/lightning/restore/restore_test.go | 15 +- pkg/lightning/restore/table_restore.go | 85 +++-- pkg/lightning/restore/tidb.go | 2 +- pkg/lightning/restore/tidb_test.go | 28 ++ pkg/logutil/context.go | 51 +++ pkg/logutil/logging_test.go | 49 +++ pkg/pdutil/pd.go | 29 ++ pkg/pdutil/pd_test.go | 32 ++ .../data/gencol.expr_index-schema.sql | 11 + .../data/gencol.expr_index.0.sql | 1 + .../data/gencol.nested-schema.sql | 3 +- .../data/gencol.virtual_only-schema.sql | 5 + .../data/gencol.virtual_only.0.sql | 1 + tests/lightning_generated_columns/run.sh | 23 ++ 33 files changed, 1117 insertions(+), 401 deletions(-) create mode 100644 pkg/logutil/context.go create mode 100644 tests/lightning_generated_columns/data/gencol.expr_index-schema.sql create mode 100644 tests/lightning_generated_columns/data/gencol.expr_index.0.sql create mode 100644 tests/lightning_generated_columns/data/gencol.virtual_only-schema.sql create mode 100644 tests/lightning_generated_columns/data/gencol.virtual_only.0.sql diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 117d3dcb12df0..a31acfb85979e 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -421,10 +421,12 @@ func (bc *Client) BackupRanges( // we collect all files in a single goroutine to avoid thread safety issues. workerPool := utils.NewWorkerPool(concurrency, "Ranges") eg, ectx := errgroup.WithContext(ctx) - for _, r := range ranges { + for id, r := range ranges { + id := id sk, ek := r.StartKey, r.EndKey workerPool.ApplyOnErrorGroup(eg, func() error { - err := bc.BackupRange(ectx, sk, ek, req, metaWriter, progressCallBack) + elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) + err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -446,15 +448,14 @@ func (bc *Client) BackupRange( start := time.Now() defer func() { elapsed := time.Since(start) - log.Info("backup range finished", zap.Duration("take", elapsed)) + logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed)) key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey) if err != nil { summary.CollectFailureUnit(key, err) } }() - log.Info("backup started", - logutil.Key("startKey", startKey), - logutil.Key("endKey", endKey), + logutil.CL(ctx).Info("backup started", + logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Uint64("rateLimit", req.RateLimit), zap.Uint32("concurrency", req.Concurrency)) @@ -475,7 +476,7 @@ func (bc *Client) BackupRange( if err != nil { return errors.Trace(err) } - log.Info("finish backup push down", zap.Int("Ok", results.Len())) + logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len())) // Find and backup remaining ranges. // TODO: test fine grained backup. @@ -490,12 +491,12 @@ func (bc *Client) BackupRange( progressCallBack(RangeUnit) if req.IsRawKv { - log.Info("backup raw ranges", + logutil.CL(ctx).Info("raw ranges backed up", logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.String("cf", req.Cf)) } else { - log.Info("backup time range", + logutil.CL(ctx).Info("time range backed up", zap.Reflect("StartVersion", req.StartVersion), zap.Reflect("EndVersion", req.EndVersion)) } @@ -590,7 +591,7 @@ func (bc *Client) fineGrainedBackup( if len(incomplete) == 0 { return nil } - log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) + logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) // Step2, retry backup on incomplete range respCh := make(chan *backuppb.BackupResponse, 4) errCh := make(chan error, 4) @@ -647,12 +648,12 @@ func (bc *Client) fineGrainedBackup( break selectLoop } if resp.Error != nil { - log.Panic("unexpected backup error", + logutil.CL(ctx).Panic("unexpected backup error", zap.Reflect("error", resp.Error)) } - log.Info("put fine grained range", - logutil.Key("startKey", resp.StartKey), - logutil.Key("endKey", resp.EndKey), + logutil.CL(ctx).Info("put fine grained range", + logutil.Key("fine-grained-range-start", resp.StartKey), + logutil.Key("fine-grained-range-end", resp.EndKey), ) rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) @@ -780,11 +781,11 @@ func (bc *Client) handleFineGrained( if berrors.Is(err, berrors.ErrFailedToConnect) { // When the leader store is died, // 20s for the default max duration before the raft election timer fires. - log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) return 20000, nil } - log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) + logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID)) return 0, errors.Annotatef(err, "failed to connect to store %d", storeID) } hasProgress := false @@ -811,17 +812,17 @@ func (bc *Client) handleFineGrained( return nil }, func() (backuppb.BackupClient, error) { - log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) return bc.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { if berrors.Is(err, berrors.ErrFailedToConnect) { // When the leader store is died, // 20s for the default max duration before the raft election timer fires. - log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) return 20000, nil } - log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) + logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) } @@ -839,6 +840,7 @@ func (bc *Client) handleFineGrained( // Stop receiving response if respFn returns error. func SendBackup( ctx context.Context, + // the `storeID` seems only used for logging now, maybe we can remove it then? storeID uint64, client backuppb.BackupClient, req backuppb.BackupRequest, @@ -857,14 +859,11 @@ func SendBackup( var errReset error backupLoop: for retry := 0; retry < backupRetryTimes; retry++ { - log.Info("try backup", - logutil.Key("startKey", req.StartKey), - logutil.Key("endKey", req.EndKey), - zap.Uint64("storeID", storeID), + logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry), ) failpoint.Inject("hint-backup-start", func(v failpoint.Value) { - log.Info("failpoint hint-backup-start injected, " + + logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + "process will notify the shell.") if sigFile, ok := v.(string); ok { file, err := os.Create(sigFile) @@ -880,13 +879,13 @@ backupLoop: bcli, err := client.Backup(ctx, &req) failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { if val.(bool) { - log.Debug("failpoint reset-retryable-error injected.") + logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.") err = status.Error(codes.Unavailable, "Unavailable error") } }) failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { if val.(bool) { - log.Debug("failpoint reset-not-retryable-error injected.") + logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") } }) @@ -900,7 +899,7 @@ backupLoop: } continue } - log.Error("fail to backup", zap.Uint64("StoreID", storeID), + logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry time", retry)) return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID) } @@ -910,9 +909,8 @@ backupLoop: resp, err := bcli.Recv() if err != nil { if errors.Cause(err) == io.EOF { // nolint:errorlint - log.Info("backup streaming finish", - zap.Uint64("StoreID", storeID), - zap.Int("retry time", retry)) + logutil.CL(ctx).Info("backup streaming finish", + zap.Int("retry-time", retry)) break backupLoop } if isRetryableError(err) { @@ -929,9 +927,9 @@ backupLoop: } // TODO: handle errors in the resp. - log.Info("range backuped", - logutil.Key("startKey", resp.GetStartKey()), - logutil.Key("endKey", resp.GetEndKey())) + logutil.CL(ctx).Info("range backed up", + logutil.Key("small-range-start-key", resp.GetStartKey()), + logutil.Key("small-range-end-key", resp.GetEndKey())) err = respFn(resp) if err != nil { return errors.Trace(err) diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 5e52f2633450b..b3a6bf06c3fec 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" @@ -66,7 +65,7 @@ func (push *pushDown) pushBackup( // Push down backup tasks to all tikv instances. res := rtree.NewRangeTree() failpoint.Inject("noop-backup", func(_ failpoint.Value) { - log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) + logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) failpoint.Return(res, nil) }) @@ -74,22 +73,23 @@ func (push *pushDown) pushBackup( for _, s := range stores { store := s storeID := s.GetId() + lctx := logutil.ContextWithField(ctx, zap.Uint64("store-id", storeID)) if s.GetState() != metapb.StoreState_Up { - log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState())) + logutil.CL(lctx).Warn("skip store", zap.Stringer("State", s.GetState())) continue } - client, err := push.mgr.GetBackupClient(ctx, storeID) + client, err := push.mgr.GetBackupClient(lctx, storeID) if err != nil { // BR should be able to backup even some of stores disconnected. // The regions managed by this store can be retried at fine-grained backup then. - log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err)) + logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err)) return res, nil } wg.Add(1) go func() { defer wg.Done() err := SendBackup( - ctx, storeID, client, req, + lctx, storeID, client, req, func(resp *backuppb.BackupResponse) error { // Forward all responses (including error). push.respCh <- responseAndStore{ @@ -99,8 +99,8 @@ func (push *pushDown) pushBackup( return nil }, func() (backuppb.BackupClient, error) { - log.Warn("reset the connection in push", zap.Uint64("storeID", storeID)) - return push.mgr.ResetBackupClient(ctx, storeID) + logutil.CL(lctx).Warn("reset the connection in push") + return push.mgr.ResetBackupClient(lctx, storeID) }) // Disconnected stores can be ignored. if err != nil { @@ -127,14 +127,14 @@ func (push *pushDown) pushBackup( } failpoint.Inject("backup-storage-error", func(val failpoint.Value) { msg := val.(string) - log.Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) + logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) resp.Error = &backuppb.Error{ Msg: msg, } }) failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { msg := val.(string) - log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) + logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) resp.Error = &backuppb.Error{ Msg: msg, } @@ -150,28 +150,28 @@ func (push *pushDown) pushBackup( errPb := resp.GetError() switch v := errPb.Detail.(type) { case *backuppb.Error_KvError: - log.Warn("backup occur kv error", zap.Reflect("error", v)) + logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v)) case *backuppb.Error_RegionError: - log.Warn("backup occur region error", zap.Reflect("error", v)) + logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v)) case *backuppb.Error_ClusterIdError: - log.Error("backup occur cluster ID error", zap.Reflect("error", v)) + logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v)) return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) default: if utils.MessageIsRetryableStorageError(errPb.GetMsg()) { - log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) + logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) continue } if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) { errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress())) - log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), + logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid.")) } if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) { errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress())) - log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), + logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), zap.String("work around", "please ensure tikv has permission to read from & write to the storage.")) } return res, berrors.ErrKVStorage @@ -181,7 +181,7 @@ func (push *pushDown) pushBackup( if !berrors.Is(err, berrors.ErrFailedToConnect) { return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) } - log.Warn("skipping disconnected stores", logutil.ShortError(err)) + logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err)) return res, nil } } diff --git a/pkg/lightning/backend/kv/sql2kv.go b/pkg/lightning/backend/kv/sql2kv.go index 07e0d01b1c3f1..b00dbadcce495 100644 --- a/pkg/lightning/backend/kv/sql2kv.go +++ b/pkg/lightning/backend/kv/sql2kv.go @@ -120,17 +120,17 @@ func autoRandomIncrementBits(col *table.Column, randomBits int) int { } // collectGeneratedColumns collects all expressions required to evaluate the -// results of all stored generated columns. The returning slice is in evaluation -// order. +// results of all generated columns. The returning slice is in evaluation order. func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.Column) ([]genCol, error) { - maxGenColOffset := -1 + hasGenCol := false for _, col := range cols { - if col.GeneratedStored && col.Offset > maxGenColOffset { - maxGenColOffset = col.Offset + if col.GeneratedExpr != nil { + hasGenCol = true + break } } - if maxGenColOffset < 0 { + if !hasGenCol { return nil, nil } @@ -165,7 +165,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C // for simplicity we just evaluate all generated columns (virtual or not) before the last stored one. var genCols []genCol for i, col := range cols { - if col.GeneratedExpr != nil && col.Offset <= maxGenColOffset { + if col.GeneratedExpr != nil { expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names) if err != nil { return nil, err diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 1b4c5446c27c1..d8e92c4a73010 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/tidb/br/pkg/membuf" split "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/table" @@ -67,7 +68,6 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/ranger" "github.com/tikv/client-go/v2/oracle" - pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -799,7 +799,7 @@ func (e *File) loadEngineMeta() error { type local struct { engines sync.Map // sync version of map[uuid.UUID]*File - pdCli pd.Client + pdCtl *pdutil.PdController conns common.GRPCConns splitCli split.SplitClient tls *common.TLS @@ -906,11 +906,11 @@ func NewLocalBackend( localFile := cfg.SortedKVDir rangeConcurrency := cfg.RangeConcurrency - pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tls.ToPDSecurityOption()) + pdCtl, err := pdutil.NewPdController(ctx, pdAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) if err != nil { return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed") } - splitCli := split.NewSplitClient(pdCli, tls.TLSConfig()) + splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig()) shouldCreate := true if enableCheckpoint { @@ -946,7 +946,7 @@ func NewLocalBackend( local := &local{ engines: sync.Map{}, - pdCli: pdCli, + pdCtl: pdCtl, splitCli: splitCli, tls: tls, pdAddr: pdAddr, @@ -969,15 +969,15 @@ func NewLocalBackend( duplicateDB: duplicateDB, } local.conns = common.NewGRPCConns() - if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil { + if err = local.checkMultiIngestSupport(ctx, pdCtl); err != nil { return backend.MakeBackend(nil), err } return backend.MakeBackend(local), nil } -func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error { - stores, err := conn.GetAllTiKVStores(ctx, pdClient, conn.SkipTiFlash) +func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController) error { + stores, err := conn.GetAllTiKVStores(ctx, pdCtl.GetPDClient(), conn.SkipTiFlash) if err != nil { return errors.Trace(err) } @@ -1278,7 +1278,7 @@ func (local *local) allocateTSIfNotExists(ctx context.Context, engine *File) err if engine.TS > 0 { return nil } - physical, logical, err := local.pdCli.GetTS(ctx) + physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -1365,6 +1365,28 @@ func (local *local) WriteToTiKV( region *split.RegionInfo, start, end []byte, ) ([]*sst.SSTMeta, Range, rangeStats, error) { + for _, peer := range region.Region.GetPeers() { + var e error + for i := 0; i < maxRetryTimes; i++ { + store, err := local.pdCtl.GetStoreInfo(ctx, peer.StoreId) + if err != nil { + e = err + continue + } + if store.Status.Capacity > 0 { + // The available disk percent of TiKV + ratio := store.Status.Available * 100 / store.Status.Capacity + if ratio < 10 { + return nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", + store.Store.Address, store.Status.Available, store.Status.Capacity) + } + } + break + } + if e != nil { + log.L().Error("failed to get StoreInfo from pd http api", zap.Error(e)) + } + } begin := time.Now() regionRange := intersectRange(region.Region, Range{start: start, end: end}) opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end} @@ -1877,13 +1899,18 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File var allErrLock sync.Mutex var allErr error var wg sync.WaitGroup - - wg.Add(len(ranges)) + metErr := atomic.NewBool(false) for _, r := range ranges { startKey := r.start endKey := r.end w := local.rangeConcurrency.Apply() + // if meet error here, skip try more here to allow fail fast. + if metErr.Load() { + local.rangeConcurrency.Recycle(w) + break + } + wg.Add(1) go func(w *worker.Worker) { defer func() { local.rangeConcurrency.Recycle(w) @@ -1910,6 +1937,9 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File allErrLock.Lock() allErr = multierr.Append(allErr, err) allErrLock.Unlock() + if err != nil { + metErr.Store(true) + } }(w) } @@ -2013,7 +2043,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab return nil } log.L().Info("Begin collect duplicate local keys", zap.String("table", tbl.Meta().Name.String())) - physicalTS, logicalTS, err := local.pdCli.GetTS(ctx) + physicalTS, logicalTS, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -2032,7 +2062,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table) error { log.L().Info("Begin collect remote duplicate keys", zap.String("table", tbl.Meta().Name.String())) - physicalTS, logicalTS, err := local.pdCli.GetTS(ctx) + physicalTS, logicalTS, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -2151,43 +2181,31 @@ func sortAndMergeRanges(ranges []Range) []Range { } func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { - if len(finishedRanges) == 0 { + if len(ranges) == 0 || len(finishedRanges) == 0 { return ranges } - result := make([]Range, 0, len(ranges)) - rIdx := 0 - fIdx := 0 - for rIdx < len(ranges) && fIdx < len(finishedRanges) { - if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].start) <= 0 { - result = append(result, ranges[rIdx]) - rIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].end) >= 0 { - fIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].start) < 0 { - result = append(result, Range{start: ranges[rIdx].start, end: finishedRanges[fIdx].start}) - switch bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) { - case -1: - rIdx++ - case 0: - rIdx++ - fIdx++ - case 1: - ranges[rIdx].start = finishedRanges[fIdx].end - fIdx++ + result := make([]Range, 0) + for _, r := range ranges { + start := r.start + end := r.end + for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 { + fr := finishedRanges[0] + if bytes.Compare(fr.start, start) > 0 { + result = append(result, Range{start: start, end: fr.start}) } - } else if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) > 0 { - ranges[rIdx].start = finishedRanges[fIdx].end - fIdx++ - } else { - rIdx++ + if bytes.Compare(fr.end, start) > 0 { + start = fr.end + } + if bytes.Compare(fr.end, end) > 0 { + break + } + finishedRanges = finishedRanges[1:] + } + if bytes.Compare(start, end) < 0 { + result = append(result, Range{start: start, end: end}) } } - - if rIdx < len(ranges) { - result = append(result, ranges[rIdx:]...) - } - return result } diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index 743d252d8804b..394c73ce66908 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "net/url" "os" @@ -36,6 +37,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" tidbcfg "github.com/pingcap/tidb/config" + "github.com/tikv/pd/server/api" "go.uber.org/zap" ) @@ -66,9 +68,12 @@ const ( ErrorOnDup = "error" defaultDistSQLScanConcurrency = 15 + distSQLScanConcurrencyPerStore = 4 defaultBuildStatsConcurrency = 20 defaultIndexSerialScanConcurrency = 20 defaultChecksumTableConcurrency = 2 + defaultTableConcurrency = 6 + defaultIndexConcurrency = 2 // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -83,6 +88,10 @@ const ( autoDiskQuotaLocalReservedSpeed uint64 = 1 * units.KiB defaultEngineMemCacheSize = 512 * units.MiB defaultLocalWriterMemCacheSize = 128 * units.MiB + + maxRetryTimes = 4 + defaultRetryBackoffTime = 100 * time.Millisecond + pdStores = "/pd/api/v1/stores" ) var ( @@ -427,6 +436,7 @@ func NewConfig() *Config { MaxKVPairs: 4096, SendKVPairs: 32768, RegionSplitSize: SplitRegionSize, + DiskQuota: ByteSize(math.MaxInt64), }, PostRestore: PostRestore{ Checksum: OpLevelRequired, @@ -585,7 +595,7 @@ func (cfg *Config) Adjust(ctx context.Context) error { if cfg.App.RegionConcurrency > cpuCount { cfg.App.RegionConcurrency = cpuCount } - cfg.DefaultVarsForImporterAndLocalBackend() + cfg.DefaultVarsForImporterAndLocalBackend(ctx) default: return errors.Errorf("invalid config: unsupported `tikv-importer.backend` (%s)", cfg.TikvImporter.Backend) } @@ -671,45 +681,58 @@ func (cfg *Config) CheckAndAdjustForLocalBackend() error { return errors.Annotate(err, "invalid tikv-importer.sorted-kv-dir") } - // we need to calculate quota if disk-quota == 0 - if cfg.TikvImporter.DiskQuota == 0 { - enginesCount := uint64(cfg.App.IndexConcurrency + cfg.App.TableConcurrency) - writeAmount := uint64(cfg.App.RegionConcurrency) * uint64(cfg.Cron.CheckDiskQuota.Milliseconds()) - reservedSize := enginesCount*uint64(cfg.TikvImporter.EngineMemCacheSize) + writeAmount*autoDiskQuotaLocalReservedSpeed - - storageSize, err := common.GetStorageSize(storageSizeDir) - if err != nil { - return errors.Trace(err) - } - if storageSize.Available <= reservedSize { - return errors.Errorf( - "insufficient disk free space on `%s` (only %s, expecting >%s), please use a storage with enough free space, or specify `tikv-importer.disk-quota`", - cfg.TikvImporter.SortedKVDir, - units.BytesSize(float64(storageSize.Available)), - units.BytesSize(float64(reservedSize))) - } - cfg.TikvImporter.DiskQuota = ByteSize(storageSize.Available - reservedSize) - } - return nil } func (cfg *Config) DefaultVarsForTiDBBackend() { + if cfg.App.TableConcurrency == 0 { + cfg.App.TableConcurrency = cfg.App.RegionConcurrency + } if cfg.App.IndexConcurrency == 0 { cfg.App.IndexConcurrency = cfg.App.RegionConcurrency } - if cfg.App.TableConcurrency == 0 { - cfg.App.TableConcurrency = cfg.App.RegionConcurrency +} + +func (cfg *Config) adjustDistSQLConcurrency(ctx context.Context) error { + tls, err := cfg.ToTLS() + if err != nil { + return err + } + result := &api.StoresInfo{} + err = tls.WithHost(cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result) + if err != nil { + return errors.Trace(err) } + cfg.TiDB.DistSQLScanConcurrency = len(result.Stores) * distSQLScanConcurrencyPerStore + if cfg.TiDB.DistSQLScanConcurrency < defaultDistSQLScanConcurrency { + cfg.TiDB.DistSQLScanConcurrency = defaultDistSQLScanConcurrency + } + log.L().Info("adjust scan concurrency success", zap.Int("DistSQLScanConcurrency", cfg.TiDB.DistSQLScanConcurrency)) + return nil } -func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { +func (cfg *Config) DefaultVarsForImporterAndLocalBackend(ctx context.Context) { + if cfg.TiDB.DistSQLScanConcurrency == defaultDistSQLScanConcurrency { + var e error + for i := 0; i < maxRetryTimes; i++ { + e = cfg.adjustDistSQLConcurrency(ctx) + if e == nil { + break + } + time.Sleep(defaultRetryBackoffTime) + } + if e != nil { + log.L().Error("failed to adjust scan concurrency", zap.Error(e)) + } + } + if cfg.App.IndexConcurrency == 0 { - cfg.App.IndexConcurrency = 2 + cfg.App.IndexConcurrency = defaultIndexConcurrency } if cfg.App.TableConcurrency == 0 { - cfg.App.TableConcurrency = 6 + cfg.App.TableConcurrency = defaultTableConcurrency } + if len(cfg.App.MetaSchemaName) == 0 { cfg.App.MetaSchemaName = defaultMetaSchemaName } @@ -719,9 +742,6 @@ func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { if cfg.TikvImporter.RegionSplitSize == 0 { cfg.TikvImporter.RegionSplitSize = SplitRegionSize } - if cfg.TiDB.DistSQLScanConcurrency == 0 { - cfg.TiDB.DistSQLScanConcurrency = defaultDistSQLScanConcurrency - } if cfg.TiDB.BuildStatsConcurrency == 0 { cfg.TiDB.BuildStatsConcurrency = defaultBuildStatsConcurrency } @@ -839,11 +859,6 @@ func (cfg *Config) AdjustCheckPoint() { } func (cfg *Config) AdjustMydumper() { - if cfg.Mydumper.BatchSize <= 0 { - // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), - // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. - cfg.Mydumper.BatchSize = defaultBatchSize - } if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { cfg.Mydumper.BatchImportRatio = 0.75 } diff --git a/pkg/lightning/config/config_test.go b/pkg/lightning/config/config_test.go index f30cb0e6a7f3d..4d4f0e34cb272 100644 --- a/pkg/lightning/config/config_test.go +++ b/pkg/lightning/config/config_test.go @@ -81,6 +81,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPort(c *C) { cfg.Mydumper.SourceDir = "." cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -100,6 +101,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPortViaAdvertiseAddr(c *C) { cfg.Mydumper.SourceDir = "." cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -116,6 +118,7 @@ func (s *configTestSuite) TestAdjustPageNotFound(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") @@ -129,6 +132,7 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 ts.Close() // immediately close to ensure connection refused. @@ -138,6 +142,7 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) { func (s *configTestSuite) TestAdjustBackendNotSet(c *C) { cfg := config.NewConfig() + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "tikv-importer.backend must not be empty!") } @@ -145,6 +150,7 @@ func (s *configTestSuite) TestAdjustBackendNotSet(c *C) { func (s *configTestSuite) TestAdjustInvalidBackend(c *C) { cfg := config.NewConfig() cfg.TikvImporter.Backend = "no_such_backend" + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend` \\(no_such_backend\\)") } @@ -159,6 +165,7 @@ func (s *configTestSuite) TestAdjustFileRoutePath(c *C) { invalidPath := filepath.Join(tmpDir, "../test123/1.sql") rule := &config.FileRouteRule{Path: invalidPath, Type: "sql", Schema: "test", Table: "tbl"} cfg.Mydumper.FileRouters = []*config.FileRouteRule{rule} + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(ctx) c.Assert(err, ErrorMatches, fmt.Sprintf("\\Qfile route path '%s' is not in source dir '%s'\\E", invalidPath, tmpDir)) @@ -178,6 +185,7 @@ func (s *configTestSuite) TestDecodeError(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") @@ -192,6 +200,7 @@ func (s *configTestSuite) TestInvalidSetting(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid `tidb.port` setting") @@ -206,6 +215,7 @@ func (s *configTestSuite) TestInvalidPDAddr(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid `tidb.pd-addr` setting") @@ -214,6 +224,7 @@ func (s *configTestSuite) TestInvalidPDAddr(c *C) { func (s *configTestSuite) TestAdjustWillNotContactServerIfEverythingIsDefined(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -225,6 +236,7 @@ func (s *configTestSuite) TestAdjustWillBatchImportRatioInvalid(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) cfg.Mydumper.BatchImportRatio = -1 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.Mydumper.BatchImportRatio, Equals, 0.75) @@ -301,6 +313,7 @@ func (s *configTestSuite) TestAdjustSecuritySection(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.LoadFromTOML([]byte(tc.input)) c.Assert(err, IsNil, comment) @@ -442,6 +455,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { cfg.TiDB.PdAddr = "test.invalid:2379" cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.LoadFromTOML([]byte(tc.input)) c.Assert(err, IsNil) @@ -545,6 +559,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { taskCfg.Checkpoint.DSN = "" taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL + taskCfg.TiDB.DistSQLScanConcurrency = 1 err = taskCfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:12345@tcp(172.16.30.11:4001)/?charset=utf8mb4&sql_mode='"+mysql.DefaultSQLMode+"'&maxAllowedPacket=67108864&tls=false") @@ -557,6 +572,7 @@ func (s *configTestSuite) TestDefaultImporterBackendValue(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) cfg.TikvImporter.Backend = "importer" + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.App.IndexConcurrency, Equals, 2) @@ -568,9 +584,9 @@ func (s *configTestSuite) TestDefaultTidbBackendValue(c *C) { assignMinimalLegalValue(cfg) cfg.TikvImporter.Backend = "tidb" cfg.App.RegionConcurrency = 123 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) - c.Assert(cfg.App.IndexConcurrency, Equals, 123) c.Assert(cfg.App.TableConcurrency, Equals, 123) } @@ -580,6 +596,7 @@ func (s *configTestSuite) TestDefaultCouldBeOverwritten(c *C) { cfg.TikvImporter.Backend = "importer" cfg.App.IndexConcurrency = 20 cfg.App.TableConcurrency = 60 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.App.IndexConcurrency, Equals, 20) @@ -667,6 +684,7 @@ func (s *configTestSuite) TestAdjustWithLegacyBlackWhiteList(c *C) { ctx := context.Background() cfg.Mydumper.Filter = []string{"test.*"} + cfg.TiDB.DistSQLScanConcurrency = 1 c.Assert(cfg.Adjust(ctx), IsNil) c.Assert(cfg.HasLegacyBlackWhiteList(), IsFalse) @@ -687,7 +705,7 @@ func (s *configTestSuite) TestAdjustDiskQuota(c *C) { cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.DiskQuota = 0 cfg.TikvImporter.SortedKVDir = base + cfg.TiDB.DistSQLScanConcurrency = 1 c.Assert(cfg.Adjust(ctx), IsNil) - // DiskQuota must greater than 0 after adjust - c.Assert(int64(cfg.TikvImporter.DiskQuota), Greater, int64(0)) + c.Assert(int64(cfg.TikvImporter.DiskQuota), Equals, int64(0)) } diff --git a/pkg/lightning/config/const.go b/pkg/lightning/config/const.go index 240d37e27c722..21289e70bdf95 100644 --- a/pkg/lightning/config/const.go +++ b/pkg/lightning/config/const.go @@ -28,5 +28,5 @@ const ( defaultMaxAllowedPacket = 64 * units.MiB - defaultBatchSize ByteSize = 100 * units.GiB + DefaultBatchSize ByteSize = 100 * units.GiB ) diff --git a/pkg/lightning/lightning_test.go b/pkg/lightning/lightning_test.go index aab2daef48a91..ba9ab7fe4a9bf 100644 --- a/pkg/lightning/lightning_test.go +++ b/pkg/lightning/lightning_test.go @@ -243,7 +243,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) { go func() { _ = s.lightning.RunServer() }() - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // Check `GET /tasks` without any active tasks @@ -375,7 +375,7 @@ func (s *lightningServerSuite) TestHTTPAPIOutsideServerMode(c *C) { go func() { errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil) }() - time.Sleep(100 * time.Millisecond) + time.Sleep(600 * time.Millisecond) var curTask struct { Current int64 diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index 26affe83877e7..7cd400e3d5b6e 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -444,6 +444,7 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error { // ReadRow reads a row from the datafile. func (parser *CSVParser) ReadRow() error { row := &parser.lastRow + row.Length = 0 row.RowID++ // skip the header first @@ -475,6 +476,7 @@ func (parser *CSVParser) ReadRow() error { row.Row = make([]types.Datum, len(records)) } for i, record := range records { + row.Length += len(record) unescaped, isNull := parser.unescapeString(record) if isNull { row.Row[i].SetNull() diff --git a/pkg/lightning/mydump/csv_parser_test.go b/pkg/lightning/mydump/csv_parser_test.go index da46986ba6930..05c0d6d79fadc 100644 --- a/pkg/lightning/mydump/csv_parser_test.go +++ b/pkg/lightning/mydump/csv_parser_test.go @@ -59,7 +59,9 @@ func (s *testMydumpCSVParserSuite) runTestCases(c *C, cfg *config.CSVConfig, blo comment := Commentf("input = %q, row = %d", tc.input, i+1) e := parser.ReadRow() c.Assert(e, IsNil, Commentf("input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{RowID: int64(i) + 1, Row: row}, comment) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i)+1, comment) + c.Assert(parser.LastRow().Row, DeepEquals, row, comment) + } c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF, Commentf("input = %q", tc.input)) } @@ -149,22 +151,25 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 1, - Row: datums[0], + RowID: 1, + Row: datums[0], + Length: 116, }) c.Assert(parser, posEq, 126, 1) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 2, - Row: datums[1], + RowID: 2, + Row: datums[1], + Length: 104, }) c.Assert(parser, posEq, 241, 2) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 3, - Row: datums[2], + RowID: 3, + Row: datums[2], + Length: 117, }) c.Assert(parser, posEq, 369, 3) @@ -224,10 +229,9 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { for i, expectedParserPos := range allExpectedParserPos { c.Assert(parser.ReadRow(), IsNil) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: int64(i + 1), - Row: datums[i], - }) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i+1)) + c.Assert(parser.LastRow().Row, DeepEquals, datums[i]) + c.Assert(parser, posEq, expectedParserPos, i+1) } @@ -253,6 +257,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 12, 1) @@ -264,6 +269,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 24, 2) @@ -281,6 +287,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 12, 1) @@ -292,6 +299,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 23, 2) @@ -309,6 +317,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 18, 1) @@ -320,6 +329,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 29, 2) @@ -339,6 +349,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("b\nbb"), types.NewStringDatum("ccc"), }, + Length: 10, }) c.Assert(parser, posEq, 19, 1) @@ -350,6 +361,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 30, 2) @@ -367,6 +379,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("b\"bb"), types.NewStringDatum("ccc"), }, + Length: 10, }) c.Assert(parser, posEq, 19, 1) @@ -394,6 +407,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) { types.NewStringDatum(`\`), types.NewStringDatum("?"), }, + Length: 6, }) c.Assert(parser, posEq, 15, 1) @@ -405,6 +419,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) { nullDatum, types.NewStringDatum(`\N`), }, + Length: 7, }) c.Assert(parser, posEq, 26, 2) @@ -462,6 +477,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("foo"), types.NewStringDatum("0000-00-00"), }, + Length: 14, }) c.Assert(parser, posEq, 32, 1) c.Assert(parser.Columns(), DeepEquals, []string{"a", "b", "c", "d", "e", "f"}) @@ -477,6 +493,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("foo"), types.NewStringDatum("0000-00-00"), }, + Length: 14, }) c.Assert(parser, posEq, 52, 2) @@ -491,6 +508,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("bar"), types.NewStringDatum("1999-12-31"), }, + Length: 23, }) c.Assert(parser, posEq, 80, 3) @@ -512,6 +530,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { nullDatum, types.NewStringDatum("abc"), }, + Length: 4, }) c.Assert(parser, posEq, 12, 1) @@ -523,6 +542,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { types.NewStringDatum("1999-12-31"), types.NewStringDatum("test"), }, + Length: 17, }) c.Assert(parser.Close(), IsNil) @@ -538,6 +558,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { nullDatum, types.NewStringDatum("abc"), }, + Length: 4, }) c.Assert(parser, posEq, 17, 1) @@ -573,26 +594,30 @@ func (s *testMydumpCSVParserSuite) TestCRLF(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 1, - Row: []types.Datum{types.NewStringDatum("a")}, + RowID: 1, + Row: []types.Datum{types.NewStringDatum("a")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 2, - Row: []types.Datum{types.NewStringDatum("b")}, + RowID: 2, + Row: []types.Datum{types.NewStringDatum("b")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 3, - Row: []types.Datum{types.NewStringDatum("c")}, + RowID: 3, + Row: []types.Datum{types.NewStringDatum("c")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 4, - Row: []types.Datum{types.NewStringDatum("d")}, + RowID: 4, + Row: []types.Datum{types.NewStringDatum("d")}, + Length: 1, }) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) @@ -613,6 +638,7 @@ func (s *testMydumpCSVParserSuite) TestQuotedSeparator(c *C) { types.NewStringDatum("'"), types.NewStringDatum("'"), }, + Length: 3, }) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index 9a3e42e4feb6b..24af41ac7ad87 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -36,12 +36,14 @@ type MDDatabaseMeta struct { } type MDTableMeta struct { - DB string - Name string - SchemaFile FileInfo - DataFiles []FileInfo - charSet string - TotalSize int64 + DB string + Name string + SchemaFile FileInfo + DataFiles []FileInfo + charSet string + TotalSize int64 + IndexRatio float64 + IsRowOrdered bool } type SourceFileMeta struct { @@ -426,11 +428,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool } s.tableIndexMap[fileInfo.TableName] = len(dbMeta.Tables) ptr := &MDTableMeta{ - DB: fileInfo.TableName.Schema, - Name: fileInfo.TableName.Name, - SchemaFile: fileInfo, - DataFiles: make([]FileInfo, 0, 16), - charSet: s.loader.charSet, + DB: fileInfo.TableName.Schema, + Name: fileInfo.TableName.Name, + SchemaFile: fileInfo, + DataFiles: make([]FileInfo, 0, 16), + charSet: s.loader.charSet, + IndexRatio: 0.0, + IsRowOrdered: true, } dbMeta.Tables = append(dbMeta.Tables, ptr) return ptr, dbExists, false @@ -441,10 +445,12 @@ func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) { _, ok := s.tableIndexMap[fileInfo.TableName] if ok { meta := &MDTableMeta{ - DB: fileInfo.TableName.Schema, - Name: fileInfo.TableName.Name, - SchemaFile: fileInfo, - charSet: s.loader.charSet, + DB: fileInfo.TableName.Schema, + Name: fileInfo.TableName.Name, + SchemaFile: fileInfo, + charSet: s.loader.charSet, + IndexRatio: 0.0, + IsRowOrdered: true, } dbMeta.Views = append(dbMeta.Views, meta) } diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index e0975c5508316..ff39f438f23ce 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -274,10 +274,12 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) { Name: "db", SchemaFile: "", Tables: []*md.MDTableMeta{{ - DB: "db", - Name: "tbl", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl"}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.sql", Type: md.SourceTypeSQL}}}, + DB: "db", + Name: "tbl", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl"}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.sql", Type: md.SourceTypeSQL}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }}, }}) } @@ -302,16 +304,20 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) { SchemaFile: "db-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "db", - Name: "0002", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002.sql", Type: md.SourceTypeSQL}}}, + DB: "db", + Name: "0002", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002.sql", Type: md.SourceTypeSQL}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }, { - DB: "db", - Name: "tbl.with.dots", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots.0001.sql", Type: md.SourceTypeSQL, SortKey: "0001"}}}, + DB: "db", + Name: "tbl.with.dots", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots.0001.sql", Type: md.SourceTypeSQL, SortKey: "0001"}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }, }, }}) @@ -392,23 +398,29 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "a1-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "a1", - Name: "s1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + DB: "a1", + Name: "s1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, { - DB: "a1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{}, + DB: "a1", + Name: "v1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ { - DB: "a1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema-view.sql", Type: md.SourceTypeViewSchema}}, + DB: "a1", + Name: "v1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema-view.sql", Type: md.SourceTypeViewSchema}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -429,6 +441,8 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { {TableName: filter.Table{Schema: "b", Name: "u"}, FileMeta: md.SourceFileMeta{Path: "a0.t1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}, {TableName: filter.Table{Schema: "b", Name: "u"}, FileMeta: md.SourceFileMeta{Path: "a1.t2.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -437,10 +451,12 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "c0-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "c", - Name: "t3", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + DB: "c", + Name: "t3", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -449,17 +465,21 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "e0-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "v", - Name: "vv", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{}, + DB: "v", + Name: "vv", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ { - DB: "v", - Name: "vv", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema-view.sql", Type: md.SourceTypeViewSchema}}, + DB: "v", + Name: "vv", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema-view.sql", Type: md.SourceTypeViewSchema}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -554,6 +574,8 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/test2.001.sql"), Type: md.SourceTypeSQL}, }, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, { DB: "d1", @@ -562,7 +584,9 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-table.sql"), Type: md.SourceTypeTableSchema}, }, - DataFiles: []md.FileInfo{}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ @@ -573,6 +597,8 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-view.sql"), Type: md.SourceTypeViewSchema}, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -587,7 +613,9 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/abc-table.sql"), Type: md.SourceTypeTableSchema}, }, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: "abc.1.sql", Type: md.SourceTypeSQL}}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: "abc.1.sql", Type: md.SourceTypeSQL}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index 40730417373c2..789163c18bb01 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -345,6 +345,7 @@ func (pp *ParquetParser) Close() error { func (pp *ParquetParser) ReadRow() error { pp.lastRow.RowID++ + pp.lastRow.Length = 0 if pp.curIndex >= len(pp.rows) { if pp.readRows >= pp.Reader.GetNumRows() { return io.EOF @@ -375,6 +376,7 @@ func (pp *ParquetParser) ReadRow() error { pp.lastRow.Row = pp.lastRow.Row[:length] } for i := 0; i < length; i++ { + pp.lastRow.Length += getDatumLen(v.Field(i)) if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil { return err } @@ -382,6 +384,20 @@ func (pp *ParquetParser) ReadRow() error { return nil } +func getDatumLen(v reflect.Value) int { + if v.Kind() == reflect.Ptr { + if v.IsNil() { + return 0 + } else { + return getDatumLen(v.Elem()) + } + } + if v.Kind() == reflect.String { + return len(v.String()) + } + return 8 +} + // convert a parquet value to Datum // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md diff --git a/pkg/lightning/mydump/parser.go b/pkg/lightning/mydump/parser.go index 8c1cb680d5872..81c050ae01635 100644 --- a/pkg/lightning/mydump/parser.go +++ b/pkg/lightning/mydump/parser.go @@ -91,8 +91,9 @@ type Chunk struct { // Row is the content of a row. type Row struct { - RowID int64 - Row []types.Datum + RowID int64 + Row []types.Datum + Length int } // MarshalLogArray implements the zapcore.ArrayMarshaler interface @@ -412,6 +413,7 @@ func (parser *ChunkParser) ReadRow() error { row := &parser.lastRow st := stateValues + row.Length = 0 for { tok, content, err := parser.lex() @@ -421,6 +423,7 @@ func (parser *ChunkParser) ReadRow() error { } return errors.Trace(err) } + row.Length += len(content) switch st { case stateTableName: switch tok { diff --git a/pkg/lightning/mydump/parser_test.go b/pkg/lightning/mydump/parser_test.go index 2a7c4f5612b87..0f388ea7cc8ab 100644 --- a/pkg/lightning/mydump/parser_test.go +++ b/pkg/lightning/mydump/parser_test.go @@ -44,7 +44,8 @@ func (s *testMydumpParserSuite) runTestCases(c *C, mode mysql.SQLMode, blockBufS e := parser.ReadRow() comment := Commentf("input = %q, row = %d, err = %s", tc.input, i+1, errors.ErrorStack(e)) c.Assert(e, IsNil, comment) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{RowID: int64(i) + 1, Row: row}, comment) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i)+1) + c.Assert(parser.LastRow().Row, DeepEquals, row) } c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF, Commentf("input = %q", tc.input)) } @@ -75,6 +76,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewIntDatum(-2), types.NewUintDatum(3), }, + Length: 62, }) c.Assert(parser.Columns(), DeepEquals, []string{"columns", "more", "columns"}) offset, rowID := parser.Pos() @@ -89,6 +91,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewStringDatum("5."), types.NewUintDatum(6), }, + Length: 6, }) c.Assert(parser.Columns(), DeepEquals, []string{"columns", "more", "columns"}) offset, rowID = parser.Pos() @@ -103,6 +106,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewUintDatum(8), types.NewUintDatum(9), }, + Length: 42, }) c.Assert(parser.Columns(), DeepEquals, []string{"x", "y", "z"}) offset, rowID = parser.Pos() @@ -121,6 +125,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewUintDatum(14), types.NewStringDatum(")"), }, + Length: 49, }) c.Assert(parser.Columns(), IsNil) offset, rowID = parser.Pos() diff --git a/pkg/lightning/mydump/region.go b/pkg/lightning/mydump/region.go index 1213024864ded..9ec56cf2d97cb 100644 --- a/pkg/lightning/mydump/region.go +++ b/pkg/lightning/mydump/region.go @@ -224,12 +224,21 @@ func MakeTableRegions( prevRowIDMax = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax } + batchSize := float64(cfg.Mydumper.BatchSize) + if cfg.Mydumper.BatchSize <= 0 { + if meta.IsRowOrdered { + batchSize = float64(config.DefaultBatchSize) + } else { + batchSize = math.Max(float64(config.DefaultBatchSize), float64(meta.TotalSize)) + } + } + log.L().Info("makeTableRegions", zap.Int("filesCount", len(meta.DataFiles)), - zap.Int64("maxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), + zap.Int64("MaxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), zap.Int("RegionsCount", len(filesRegions)), + zap.Float64("BatchSize", batchSize), zap.Duration("cost", time.Since(start))) - - AllocateEngineIDs(filesRegions, dataFileSizes, float64(cfg.Mydumper.BatchSize), cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) + AllocateEngineIDs(filesRegions, dataFileSizes, batchSize, cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) return filesRegions, nil } diff --git a/pkg/lightning/restore/check_info.go b/pkg/lightning/restore/check_info.go index 8e537eea6bd7a..05b55a2382681 100644 --- a/pkg/lightning/restore/check_info.go +++ b/pkg/lightning/restore/check_info.go @@ -14,6 +14,7 @@ package restore import ( + "bytes" "context" "fmt" "io" @@ -23,18 +24,24 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" - "github.com/pingcap/log" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" - md "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/table/tables" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/api" - "github.com/tikv/pd/server/config" + pdconfig "github.com/tikv/pd/server/config" "go.uber.org/zap" + ) const ( @@ -51,7 +58,9 @@ const ( pdStores = "/pd/api/v1/stores" pdReplicate = "/pd/api/v1/config/replicate" - defaultCSVSize = 10 * units.GiB + defaultCSVSize = 10 * units.GiB + maxSampleDataSize = 10 * 1024 * 1024 + maxSampleRowCount = 10 * 1024 ) func (rc *Controller) isSourceInLocal() bool { @@ -59,7 +68,7 @@ func (rc *Controller) isSourceInLocal() bool { } func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) { - result := &config.ReplicationConfig{} + result := &pdconfig.ReplicationConfig{} err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result) if err != nil { return 0, errors.Trace(err) @@ -68,7 +77,7 @@ func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) { } // ClusterResource check cluster has enough resource to import data. this test can by skipped. -func (rc *Controller) ClusterResource(ctx context.Context) error { +func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error { passed := true message := "Cluster resources are rich for this import task" defer func() { @@ -80,33 +89,30 @@ func (rc *Controller) ClusterResource(ctx context.Context) error { if err != nil { return errors.Trace(err) } - totalAvailable := typeutil.ByteSize(0) + totalCapacity := typeutil.ByteSize(0) for _, store := range result.Stores { - totalAvailable += store.Status.Available + totalCapacity += store.Status.Capacity } - var sourceSize int64 - err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { - sourceSize += size - return nil - }) - if err != nil { - return errors.Trace(err) + clusterSource := localSource + if rc.taskMgr != nil { + clusterSource, err = rc.taskMgr.CheckClusterSource(ctx) + if err != nil { + return errors.Trace(err) + } } + replicaCount, err := rc.getReplicaCount(ctx) if err != nil { return errors.Trace(err) } - // sourceSize is the total size of current csv/parquet/sql files. - // it's not a simple multiple relationship with the final cluster occupancy, because - // 1. sourceSize didn't compress with RocksDB. - // 2. the index size was not included in sourceSize. - // so we have to make estimateSize redundant with 1.5. - estimateSize := uint64(sourceSize) * replicaCount * 3 / 2 - - if typeutil.ByteSize(estimateSize) > totalAvailable { + estimateSize := uint64(clusterSource) * replicaCount + if typeutil.ByteSize(estimateSize) > totalCapacity { passed = false - message = fmt.Sprintf("Cluster doesn't have enough space, %s is avaiable, but we need %s", - units.BytesSize(float64(totalAvailable)), units.BytesSize(float64(estimateSize))) + message = fmt.Sprintf("Cluster doesn't have enough space, capacity is %s, but we need %s", + units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) + } else { + message = fmt.Sprintf("Cluster capacity is rich, capacity is %s, we need %s", + units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) } return nil } @@ -162,7 +168,7 @@ func (rc *Controller) StoragePermission(ctx context.Context) error { // HasLargeCSV checks whether input csvs is fit for Lightning import. // If strictFormat is false, and csv file is large. Lightning will have performance issue. // this test cannot be skipped. -func (rc *Controller) HasLargeCSV(dbMetas []*md.MDDatabaseMeta) error { +func (rc *Controller) HasLargeCSV(dbMetas []*mydump.MDDatabaseMeta) error { passed := true message := "Source csv files size is proper" defer func() { @@ -185,8 +191,47 @@ func (rc *Controller) HasLargeCSV(dbMetas []*md.MDDatabaseMeta) error { return nil } +func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) { + sourceSize := int64(0) + originSource := int64(0) + bigTableCount := 0 + tableCount := 0 + unSortedTableCount := 0 + for _, db := range rc.dbMetas { + info, ok := rc.dbInfos[db.Name] + if !ok { + continue + } + for _, tbl := range db.Tables { + tableInfo, ok := info.Tables[tbl.Name] + if ok { + if err := rc.SampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil { + return sourceSize, errors.Trace(err) + } + sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio) + originSource += tbl.TotalSize + if tbl.TotalSize > int64(config.DefaultBatchSize)*2 { + bigTableCount += 1 + if !tbl.IsRowOrdered { + unSortedTableCount += 1 + } + } + tableCount += 1 + } + } + } + + // Do not import with too large concurrency because these data may be all unsorted. + if bigTableCount > 0 && unSortedTableCount > 0 { + if rc.cfg.App.TableConcurrency > rc.cfg.App.IndexConcurrency { + rc.cfg.App.TableConcurrency = rc.cfg.App.IndexConcurrency + } + } + return sourceSize, nil +} + // LocalResource checks the local node has enough resources for this import when local backend enabled; -func (rc *Controller) LocalResource(ctx context.Context) error { +func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error { if rc.isSourceInLocal() { sourceDir := strings.TrimPrefix(rc.cfg.Mydumper.SourceDir, storage.LocalURIPrefix) same, err := common.SameDisk(sourceDir, rc.cfg.TikvImporter.SortedKVDir) @@ -199,46 +244,54 @@ func (rc *Controller) LocalResource(ctx context.Context) error { rc.cfg.TikvImporter.SortedKVDir, sourceDir)) } } - var sourceSize uint64 - err := rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { - sourceSize += uint64(size) - return nil - }) - if err != nil { - return errors.Trace(err) - } storageSize, err := common.GetStorageSize(rc.cfg.TikvImporter.SortedKVDir) if err != nil { return errors.Trace(err) } localAvailable := storageSize.Available + if err = rc.taskMgr.InitTask(ctx, sourceSize); err != nil { + return errors.Trace(err) + } var message string var passed bool switch { - case localAvailable > sourceSize*3/2: - message = fmt.Sprintf("local disk resources are rich, source dir has %s, local available is %s", + case localAvailable > uint64(sourceSize): + message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s", units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable))) passed = true default: - message = fmt.Sprintf("local disk space may not enough to finish import, source dir has %s, but local available is %s,"+ - "we may use disk-quota(%s) to finish imports", units.BytesSize(float64(sourceSize)), - units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) - passed = true + if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) { + message = fmt.Sprintf("local disk space may not enough to finish import"+ + "estimate sorted data size is %s, but local available is %s,"+ + "you need a smaller number for tikv-importer.disk-quota (%s) to finish imports", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) + passed = false + log.L().Error(message) + } else { + message = fmt.Sprintf("local disk space may not enough to finish import, "+ + "estimate sorted data size is %s, but local available is %s,"+ + "we will use disk-quota (size: %s) to finish imports, which may slow down import", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) + passed = true + log.L().Warn(message) + } } rc.checkTemplate.Collect(Critical, passed, message) return nil } // CheckpointIsValid checks whether we can start this import with this checkpoint. -func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, bool, error) { +func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, bool, error) { msgs := make([]string, 0) uniqueName := common.UniqueTable(tableInfo.DB, tableInfo.Name) tableCheckPoint, err := rc.checkpointsDB.Get(ctx, uniqueName) if err != nil { // there is no checkpoint - log.Debug("no checkpoint detected", zap.String("table", uniqueName)) + log.L().Debug("no checkpoint detected", zap.String("table", uniqueName)) return nil, true, nil } // if checkpoint enable and not missing, we skip the check table empty progress. @@ -261,7 +314,7 @@ func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *md.MDTab } } if len(columns) == 0 { - log.Debug("no valid checkpoint detected", zap.String("table", uniqueName)) + log.L().Debug("no valid checkpoint detected", zap.String("table", uniqueName)) return nil, false, nil } info := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name] @@ -285,10 +338,10 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.Flag) } -func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.SourceFileMeta) (cols []string, colCnt int, err error) { +func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydump.SourceFileMeta) (cols []string, colCnt int, err error) { var reader storage.ReadSeekCloser - if dataFileMeta.Type == md.SourceTypeParquet { - reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) + if dataFileMeta.Type == mydump.SourceTypeParquet { + reader, err = mydump.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) } else { reader, err = rc.store.Open(ctx, dataFileMeta.Path) } @@ -296,16 +349,16 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.S return nil, 0, errors.Trace(err) } - var parser md.Parser + var parser mydump.Parser blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) switch dataFileMeta.Type { - case md.SourceTypeCSV: + case mydump.SourceTypeCSV: hasHeader := rc.cfg.Mydumper.CSV.Header - parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) - case md.SourceTypeSQL: - parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) - case md.SourceTypeParquet: - parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) + parser = mydump.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) + case mydump.SourceTypeSQL: + parser = mydump.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) + case mydump.SourceTypeParquet: + parser, err = mydump.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) if err != nil { return nil, 0, errors.Trace(err) } @@ -322,7 +375,7 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.S } // SchemaIsValid checks the import file and cluster schema is match. -func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, error) { +func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) { msgs := make([]string, 0) info, ok := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name] if !ok { @@ -341,7 +394,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe } if len(tableInfo.DataFiles) == 0 { - log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) + log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) return nil, nil } @@ -361,7 +414,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe // get columns name from data file. dataFileMeta := dataFile.FileMeta - if tp := dataFileMeta.Type; tp != md.SourceTypeCSV && tp != md.SourceTypeSQL && tp != md.SourceTypeParquet { + if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet { msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) return msgs, nil } @@ -370,7 +423,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe return nil, errors.Trace(err) } if colsFromDataFile == nil && colCountFromDataFile == 0 { - log.Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) + log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) continue } @@ -411,7 +464,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe for _, col := range colsFromDataFile { if _, ok := colMap[col]; !ok { checkMsg := "please check table schema" - if dataFileMeta.Type == md.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { + if dataFileMeta.Type == mydump.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { checkMsg += " and csv file header" } msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ @@ -438,3 +491,126 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe } return msgs, nil } + +func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error { + if len(tableMeta.DataFiles) == 0 { + return nil + } + sampleFile := tableMeta.DataFiles[0].FileMeta + var reader storage.ReadSeekCloser + var err error + if sampleFile.Type == mydump.SourceTypeParquet { + reader, err = mydump.OpenParquetReader(ctx, rc.store, sampleFile.Path, sampleFile.FileSize) + } else { + reader, err = rc.store.Open(ctx, sampleFile.Path) + } + if err != nil { + return errors.Trace(err) + } + idAlloc := kv.NewPanickingAllocators(0) + tbl, err := tables.TableFromMeta(idAlloc, tableInfo) + + kvEncoder, err := rc.backend.NewEncoder(tbl, &kv.SessionOptions{ + SQLMode: rc.cfg.TiDB.SQLMode, + Timestamp: 0, + SysVars: rc.sysVars, + AutoRandomSeed: 0, + }) + blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) + + var parser mydump.Parser + switch tableMeta.DataFiles[0].FileMeta.Type { + case mydump.SourceTypeCSV: + hasHeader := rc.cfg.Mydumper.CSV.Header + parser = mydump.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) + case mydump.SourceTypeSQL: + parser = mydump.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) + case mydump.SourceTypeParquet: + parser, err = mydump.NewParquetParser(ctx, rc.store, reader, sampleFile.Path) + if err != nil { + return errors.Trace(err) + } + default: + panic(fmt.Sprintf("file '%s' with unknown source type '%s'", sampleFile.Path, sampleFile.Type.String())) + } + defer parser.Close() + logTask := log.With(zap.String("table", tableMeta.Name)).Begin(zap.InfoLevel, "sample file") + igCols, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(dbName, tableMeta.Name, rc.cfg.Mydumper.CaseSensitive) + if err != nil { + return errors.Trace(err) + } + + initializedColumns, reachEOF := false, false + var columnPermutation []int + var kvSize uint64 = 0 + var rowSize uint64 = 0 + rowCount := 0 + dataKVs := rc.backend.MakeEmptyRows() + indexKVs := rc.backend.MakeEmptyRows() + lastKey := make([]byte, 0) + tableMeta.IsRowOrdered = true + tableMeta.IndexRatio = 1.0 +outloop: + for !reachEOF { + offset, _ := parser.Pos() + err = parser.ReadRow() + columnNames := parser.Columns() + + switch errors.Cause(err) { + case nil: + if !initializedColumns { + if len(columnPermutation) == 0 { + columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo) + if err != nil { + return errors.Trace(err) + } + } + initializedColumns = true + } + case io.EOF: + reachEOF = true + break outloop + default: + err = errors.Annotatef(err, "in file offset %d", offset) + return errors.Trace(err) + } + lastRow := parser.LastRow() + rowSize += uint64(lastRow.Length) + rowCount += 1 + + var dataChecksum, indexChecksum verification.KVChecksum + kvs, encodeErr := kvEncoder.Encode(logTask.Logger, lastRow.Row, lastRow.RowID, columnPermutation, offset) + parser.RecycleRow(lastRow) + if encodeErr != nil { + err = errors.Annotatef(encodeErr, "in file at offset %d", offset) + return errors.Trace(err) + } + if tableMeta.IsRowOrdered { + kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) + for _, kv := range kv.KvPairsFromRows(dataKVs) { + if len(lastKey) == 0 { + lastKey = kv.Key + } else if bytes.Compare(lastKey, kv.Key) > 0 { + tableMeta.IsRowOrdered = false + break + } + } + dataKVs = dataKVs.Clear() + indexKVs = indexKVs.Clear() + } + kvSize += kvs.Size() + + failpoint.Inject("mock-kv-size", func(val failpoint.Value) { + kvSize += uint64(val.(int)) + }) + if rowSize > maxSampleDataSize && rowCount > maxSampleRowCount { + break + } + } + + if rowSize > 0 && kvSize > rowSize { + tableMeta.IndexRatio = float64(kvSize) / float64(rowSize) + } + log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered)) + return nil +} diff --git a/pkg/lightning/restore/meta_manager.go b/pkg/lightning/restore/meta_manager.go index 7b20a520177b5..d282c01bf3c75 100644 --- a/pkg/lightning/restore/meta_manager.go +++ b/pkg/lightning/restore/meta_manager.go @@ -459,11 +459,18 @@ func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error { } type taskMetaMgr interface { - InitTask(ctx context.Context) error + InitTask(ctx context.Context, source int64) error + CheckClusterSource(ctx context.Context) (int64, error) + CheckTaskExist(ctx context.Context) (bool, error) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) - CheckAndFinishRestore(ctx context.Context) (bool, error) + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata + // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) + // the second boolean indicates whether to clean up the metadata in tidb + CheckAndFinishRestore(ctx context.Context, finished bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) Cleanup(ctx context.Context) error + CleanupTask(ctx context.Context) error CleanupAllMetas(ctx context.Context) error + Close() } type dbTaskMetaMgr struct { @@ -484,6 +491,11 @@ const ( taskMetaStatusSwitchBack ) +const ( + taskStateNormal int = iota + taskStateExited +) + func (m taskMetaStatus) String() string { switch m { case taskMetaStatusInitial: @@ -519,17 +531,65 @@ type storedCfgs struct { RestoreCfg pdutil.ClusterConfig `json:"restore"` } -func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error { +func (m *dbTaskMetaMgr) InitTask(ctx context.Context, source int64) error { exec := &common.SQLWithRetry{ DB: m.session, Logger: log.L(), } // avoid override existing metadata if the meta is already inserted. - stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName) - err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String()) + stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName) + err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), source, taskStateNormal) return errors.Trace(err) } +func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + // avoid override existing metadata if the meta is already inserted. + exist := false + err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.Annotate(err, "fetch task meta failed") + } + var taskID int64 + for rows.Next() { + if err = rows.Scan(&taskID); err != nil { + rows.Close() + return errors.Trace(err) + } + if taskID == m.taskID { + exist = true + } + } + err = rows.Close() + return errors.Trace(err) + }) + return exist, errors.Trace(err) +} + +func (m *dbTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return 0, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + + source := int64(0) + query := fmt.Sprintf("SELECT SUM(source_bytes) from %s", m.tableName) + if err := exec.QueryRow(ctx, "query total source size", query, &source); err != nil { + return 0, errors.Annotate(err, "fetch task meta failed") + } + return source, nil +} + func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { pauseCtx, cancel := context.WithCancel(ctx) conn, err := m.session.Conn(ctx) @@ -552,7 +612,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U paused := false var pausedCfg storedCfgs err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -567,10 +627,11 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U taskID int64 cfg string statusValue string + state int ) var cfgStr string for rows.Next() { - if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil { + if err = rows.Scan(&taskID, &cfg, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -647,10 +708,13 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } -func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { +// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata +// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) +// the second boolean indicates whether to clean up the metadata in tidb +func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool) (bool, bool, error) { conn, err := m.session.Conn(ctx) if err != nil { - return false, errors.Trace(err) + return false, false, errors.Trace(err) } defer conn.Close() exec := &common.SQLWithRetry{ @@ -659,12 +723,13 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") if err != nil { - return false, errors.Annotate(err, "enable pessimistic transaction failed") + return false, false, errors.Annotate(err, "enable pessimistic transaction failed") } switchBack := true + allFinished := finished err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -678,10 +743,12 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) var ( taskID int64 statusValue string + state int ) - newStatus := taskMetaStatusSwitchBack + + taskStatus := taskMetaStatusInitial for rows.Next() { - if err = rows.Scan(&taskID, &statusValue); err != nil { + if err = rows.Scan(&taskID, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -690,13 +757,18 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } if taskID == m.taskID { + taskStatus = status continue } if status < taskMetaStatusSwitchSkipped { - newStatus = taskMetaStatusSwitchSkipped - switchBack = false - break + allFinished = false + // check if other task still running + if state == taskStateNormal { + log.L().Info("unfinished task found", zap.Int64("task_id", taskID), + zap.Stringer("status", status)) + switchBack = false + } } } if err = rows.Close(); err != nil { @@ -704,13 +776,28 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } closed = true - query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName) - _, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID) + if taskStatus < taskMetaStatusSwitchSkipped { + newStatus := taskMetaStatusSwitchBack + newState := taskStateNormal + if !finished { + newStatus = taskStatus + newState = taskStateExited + } else if !allFinished { + newStatus = taskMetaStatusSwitchSkipped + } + + query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName) + if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil { + return errors.Trace(err) + } + } - return errors.Trace(err) + return nil }) + log.L().Info("check all task finish status", zap.Bool("task_finished", finished), + zap.Bool("all_finished", allFinished), zap.Bool("switch_back", switchBack)) - return switchBack, err + return switchBack, allFinished, err } func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { @@ -726,6 +813,20 @@ func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { return nil } +func (m *dbTaskMetaMgr) CleanupTask(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + stmt := fmt.Sprintf("DELETE FROM %s WHERE task_id = %d;", m.tableName, m.taskID) + err := exec.Exec(ctx, "clean up task", stmt) + return errors.Trace(err) +} + +func (m *dbTaskMetaMgr) Close() { + m.pd.Close() +} + func (m *dbTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { exec := &common.SQLWithRetry{ DB: m.session, @@ -767,7 +868,7 @@ func (b noopMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type noopTaskMetaMgr struct{} -func (m noopTaskMetaMgr) InitTask(ctx context.Context) error { +func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error { return nil } @@ -777,18 +878,33 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } -func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { +func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return false, nil } +func (m noopTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { + return 0, nil +} + +func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) { + return false, true, nil +} + func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error { return nil } +func (m noopTaskMetaMgr) CleanupTask(ctx context.Context) error { + return nil +} + func (m noopTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { return nil } +func (m noopTaskMetaMgr) Close() { +} + type noopTableMetaMgr struct{} func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 9315337b128d6..b0d892ac83636 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -100,6 +100,8 @@ const ( task_id BIGINT(20) UNSIGNED NOT NULL, pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '', status VARCHAR(32) NOT NULL, + state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish', + source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, PRIMARY KEY (task_id) );` @@ -247,6 +249,7 @@ type Controller struct { closedEngineLimit *worker.Pool store storage.ExternalStorage metaMgrBuilder metaMgrBuilder + taskMgr taskMetaMgr diskQuotaLock *diskQuotaLock diskQuotaState atomic.Int32 @@ -353,8 +356,8 @@ func NewRestoreControllerWithPauser( rc := &Controller{ cfg: cfg, dbMetas: dbMetas, - tableWorkers: worker.NewPool(ctx, cfg.App.TableConcurrency, "table"), - indexWorkers: worker.NewPool(ctx, cfg.App.IndexConcurrency, "index"), + tableWorkers: nil, + indexWorkers: nil, regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"), ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"), checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), @@ -373,6 +376,7 @@ func NewRestoreControllerWithPauser( store: s, metaMgrBuilder: metaBuilder, diskQuotaLock: newDiskQuotaLock(), + taskMgr: nil, } return rc, nil @@ -385,9 +389,9 @@ func (rc *Controller) Close() { func (rc *Controller) Run(ctx context.Context) error { opts := []func(context.Context) error{ - rc.preCheckRequirements, rc.setGlobalVariables, rc.restoreSchema, + rc.preCheckRequirements, rc.restoreTables, rc.fullCompact, rc.switchToNormalMode, @@ -844,7 +848,12 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { estimatedChunkCount := 0.0 estimatedEngineCnt := int64(0) - batchSize := int64(rc.cfg.Mydumper.BatchSize) + batchSize := rc.cfg.Mydumper.BatchSize + if batchSize <= 0 { + // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), + // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. + batchSize = config.DefaultBatchSize + } for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) @@ -871,7 +880,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { } // estimate engines count if engine cp is empty if len(dbCp.Engines) == 0 { - estimatedEngineCnt += ((tableMeta.TotalSize + batchSize - 1) / batchSize) + 1 + estimatedEngineCnt += ((tableMeta.TotalSize + int64(batchSize) - 1) / int64(batchSize)) + 1 } for _, fileMeta := range tableMeta.DataFiles { if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok { @@ -1178,9 +1187,11 @@ var checksumManagerKey struct{} func (rc *Controller) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") - - if err := rc.metaMgrBuilder.Init(ctx); err != nil { - return err + if rc.tableWorkers == nil { + rc.tableWorkers = worker.NewPool(ctx, rc.cfg.App.TableConcurrency, "table") + } + if rc.indexWorkers == nil { + rc.indexWorkers = worker.NewPool(ctx, rc.cfg.App.IndexConcurrency, "index") } // for local backend, we should disable some pd scheduler and change some settings, to @@ -1192,27 +1203,17 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // we do not do switch back automatically cleanupFunc := func() {} switchBack := false + taskFinished := false if rc.cfg.TikvImporter.Backend == config.BackendLocal { - // disable some pd schedulers - pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, - rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) - if err != nil { - return errors.Trace(err) - } - - mgr := rc.metaMgrBuilder.TaskMetaMgr(pdController) - if err = mgr.InitTask(ctx); err != nil { - return err - } logTask.Info("removing PD leader®ion schedulers") - restoreFn, err := mgr.CheckAndPausePdSchedulers(ctx) + restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) finishSchedulers = func() { if restoreFn != nil { // use context.Background to make sure this restore function can still be executed even if ctx is canceled restoreCtx := context.Background() - needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx) + needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) if err != nil { logTask.Warn("check restore pd schedulers failed", zap.Error(err)) return @@ -1222,22 +1223,25 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if restoreE := restoreFn(restoreCtx); restoreE != nil { logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + + logTask.Info("add back PD leader®ion schedulers") // clean up task metas - if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil { - logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) - } - // cleanup table meta and schema db if needed. - cleanupFunc = func() { - if e := mgr.CleanupAllMetas(restoreCtx); err != nil { - logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + if needCleanup { + logTask.Info("cleanup task metas") + if cleanupErr := rc.taskMgr.Cleanup(restoreCtx); cleanupErr != nil { + logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) + } + // cleanup table meta and schema db if needed. + cleanupFunc = func() { + if e := rc.taskMgr.CleanupAllMetas(restoreCtx); err != nil { + logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + } } } } - - logTask.Info("add back PD leader®ion schedulers") } - pdController.Close() + rc.taskMgr.Close() } if err != nil { @@ -1432,6 +1436,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // finishSchedulers() // cancelFunc(switchBack) // finishFuncCalled = true + taskFinished = true close(postProcessTaskChan) // otherwise, we should run all tasks in the post-process task chan @@ -1564,43 +1569,6 @@ func (tr *TableRestore) restoreTable( return tr.postProcess(ctx, rc, cp, false /* force-analyze */, metaMgr) } -// estimate SST files compression threshold by total row file size -// with a higher compression threshold, the compression time increases, but the iteration time decreases. -// Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, -// we set the upper bound to 32GB to avoid too long compression time. -// factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine). -func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) int64 { - totalRawFileSize := int64(0) - var lastFile string - for _, engineCp := range cp.Engines { - for _, chunk := range engineCp.Chunks { - if chunk.FileMeta.Path == lastFile { - continue - } - size := chunk.FileMeta.FileSize - if chunk.FileMeta.Type == mydump.SourceTypeParquet { - // parquet file is compressed, thus estimates with a factor of 2 - size *= 2 - } - totalRawFileSize += size - lastFile = chunk.FileMeta.Path - } - } - totalRawFileSize *= factor - - // try restrict the total file number within 512 - threshold := totalRawFileSize / 512 - threshold = utils.NextPowerOfTwo(threshold) - if threshold < compactionLowerThreshold { - // disable compaction if threshold is smaller than lower bound - threshold = 0 - } else if threshold > compactionUpperThreshold { - threshold = compactionUpperThreshold - } - - return threshold -} - // do full compaction for the whole data. func (rc *Controller) fullCompact(ctx context.Context) error { if !rc.cfg.PostRestore.Compact { @@ -1797,10 +1765,6 @@ func (rc *Controller) isLocalBackend() bool { // 3. Lightning configuration // before restore tables start. func (rc *Controller) preCheckRequirements(ctx context.Context) error { - if !rc.cfg.App.CheckRequirements { - log.L().Info("skip pre check due to user requirement") - return nil - } if err := rc.ClusterIsAvailable(ctx); err != nil { return errors.Trace(err) } @@ -1808,15 +1772,50 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err := rc.StoragePermission(ctx); err != nil { return errors.Trace(err) } - - if err := rc.ClusterResource(ctx); err != nil { - return errors.Trace(err) + if err := rc.metaMgrBuilder.Init(ctx); err != nil { + return err } + taskExist := false if rc.isLocalBackend() { - if err := rc.LocalResource(ctx); err != nil { + source, err := rc.EstimateSourceData(ctx) + if err != nil { + return errors.Trace(err) + } + + pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, + rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) + if err != nil { + return errors.Trace(err) + } + + rc.taskMgr = rc.metaMgrBuilder.TaskMetaMgr(pdController) + taskExist, err = rc.taskMgr.CheckTaskExist(ctx) + if err != nil { return errors.Trace(err) } + if !taskExist { + err = rc.LocalResource(ctx, source) + if err != nil { + rc.taskMgr.CleanupTask(ctx) + return errors.Trace(err) + } + if err := rc.ClusterResource(ctx, source); err != nil { + rc.taskMgr.CleanupTask(ctx) + return errors.Trace(err) + } + } + } + if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() { + // print check template only if check requirements is true. + fmt.Print(rc.checkTemplate.Output()) + if !rc.checkTemplate.Success() { + if !taskExist && rc.taskMgr != nil { + rc.taskMgr.CleanupTask(ctx) + } + return errors.Errorf("tidb-lightning pre-check failed." + + " Please fix the failed check(s) or set --check-requirements=false to skip checks") + } } return nil } diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index f5223703fc14b..c1eb7d42206ec 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -947,8 +947,8 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics(c *C) { s.tableInfo.DB: s.dbInfo, }, tableWorkers: worker.NewPool(ctx, 6, "table"), - indexWorkers: worker.NewPool(ctx, 2, "index"), ioWorkers: worker.NewPool(ctx, 5, "io"), + indexWorkers: worker.NewPool(ctx, 2, "index"), regionWorkers: worker.NewPool(ctx, 10, "region"), checksumWorks: worker.NewPool(ctx, 2, "region"), saveCpCh: chptCh, @@ -1597,7 +1597,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "available": "24" + "capacity": "24" } } ] @@ -1605,7 +1605,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { []byte(`{ "max-replicas": 1 }`), - "(.*)Cluster resources are rich for this import task(.*)", + "(.*)Cluster capacity is rich(.*)", true, 0, }, @@ -1618,7 +1618,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "available": "23" + "capacity": "15" } } ] @@ -1663,7 +1663,12 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { url := strings.TrimPrefix(server.URL, "https://") cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}} rc := &Controller{cfg: cfg, tls: tls, store: mockStore, checkTemplate: template} - err := rc.ClusterResource(ctx) + var sourceSize int64 + err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { + sourceSize += size + return nil + }) + err = rc.ClusterResource(ctx, sourceSize) c.Assert(err, IsNil) c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCount) diff --git a/pkg/lightning/restore/table_restore.go b/pkg/lightning/restore/table_restore.go index b3fe3f34c36a4..48791ac1d1348 100644 --- a/pkg/lightning/restore/table_restore.go +++ b/pkg/lightning/restore/table_restore.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/table/tables" "go.uber.org/multierr" "go.uber.org/zap" + "github.com/pingcap/tidb/br/pkg/utils" ) type TableRestore struct { @@ -156,13 +157,22 @@ func (t *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDB // // The argument `columns` _must_ be in lower case. func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.ChunkCheckpoint) error { + colPerm, err := createColumnPermutation(columns, tr.ignoreColumns, tr.tableInfo.Core) + if err != nil { + return err + } + ccp.ColumnPermutation = colPerm + return nil +} + +func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo *model.TableInfo) ([]int, error) { var colPerm []int if len(columns) == 0 { - colPerm = make([]int, 0, len(tr.tableInfo.Core.Columns)+1) - shouldIncludeRowID := common.TableHasAutoRowID(tr.tableInfo.Core) + colPerm = make([]int, 0, len(tableInfo.Columns)+1) + shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) // no provided columns, so use identity permutation. - for i := range tr.tableInfo.Core.Columns { + for i := range tableInfo.Columns { colPerm = append(colPerm, i) } if shouldIncludeRowID { @@ -170,14 +180,12 @@ func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.Chu } } else { var err error - colPerm, err = parseColumnPermutations(tr.tableInfo.Core, columns, tr.ignoreColumns) + colPerm, err = parseColumnPermutations(tableInfo, columns, ignoreColumns) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } } - - ccp.ColumnPermutation = colPerm - return nil + return colPerm, nil } func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { @@ -281,26 +289,21 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp if engine.Status < checkpoints.CheckpointStatusImported { wg.Add(1) - // Note: We still need tableWorkers to control the concurrency of tables. - // In the future, we will investigate more about - // the difference between restoring tables concurrently and restoring tables one by one. + // If the number of chunks is small, it means that this engine may be finished in a few times. + // We do not limit it in TableConcurrency restoreWorker := rc.tableWorkers.Apply() - go func(w *worker.Worker, eid int32, ecp *checkpoints.EngineCheckpoint) { defer wg.Done() - engineLogTask := tr.logger.With(zap.Int32("engineNumber", eid)).Begin(zap.InfoLevel, "restore engine") dataClosedEngine, err := tr.restoreEngine(ctx, rc, indexEngine, eid, ecp) engineLogTask.End(zap.ErrorLevel, err) rc.tableWorkers.Recycle(w) - if err != nil { - setError(err) - return + if err == nil { + dataWorker := rc.closedEngineLimit.Apply() + defer rc.closedEngineLimit.Recycle(dataWorker) + err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp) } - - dataWorker := rc.closedEngineLimit.Apply() - defer rc.closedEngineLimit.Recycle(dataWorker) - if err := tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil { + if err != nil { setError(err) } }(restoreWorker, engineID, engine) @@ -390,6 +393,11 @@ func (tr *TableRestore) restoreEngine( TableInfo: tr.tableInfo, Local: &backend.LocalEngineConfig{}, } + if !tr.tableMeta.IsRowOrdered { + dataEngineCfg.Local.Compact = true + dataEngineCfg.Local.CompactConcurrency = 4 + dataEngineCfg.Local.CompactThreshold = compactionUpperThreshold + } dataEngine, err := rc.backend.OpenEngine(ctx, dataEngineCfg, tr.tableName, engineID) if err != nil { return nil, errors.Trace(err) @@ -884,3 +892,40 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, g glue.SQLExecutor) er task.End(zap.ErrorLevel, err) return err } + +// estimate SST files compression threshold by total row file size +// with a higher compression threshold, the compression time increases, but the iteration time decreases. +// Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, +// we set the upper bound to 32GB to avoid too long compression time. +// factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine). +func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) int64 { + totalRawFileSize := int64(0) + var lastFile string + for _, engineCp := range cp.Engines { + for _, chunk := range engineCp.Chunks { + if chunk.FileMeta.Path == lastFile { + continue + } + size := chunk.FileMeta.FileSize + if chunk.FileMeta.Type == mydump.SourceTypeParquet { + // parquet file is compressed, thus estimates with a factor of 2 + size *= 2 + } + totalRawFileSize += size + lastFile = chunk.FileMeta.Path + } + } + totalRawFileSize *= factor + + // try restrict the total file number within 512 + threshold := totalRawFileSize / 512 + threshold = utils.NextPowerOfTwo(threshold) + if threshold < compactionLowerThreshold { + // disable compaction if threshold is smaller than lower bound + threshold = 0 + } else if threshold > compactionUpperThreshold { + threshold = compactionUpperThreshold + } + + return threshold +} diff --git a/pkg/lightning/restore/tidb.go b/pkg/lightning/restore/tidb.go index 86c0d0d6d5855..000aaba9d3ffe 100644 --- a/pkg/lightning/restore/tidb.go +++ b/pkg/lightning/restore/tidb.go @@ -195,7 +195,7 @@ func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName s } var res strings.Builder - ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, &res) + ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment, &res) retStmts := make([]string, 0, len(stmts)) for _, stmt := range stmts { diff --git a/pkg/lightning/restore/tidb_test.go b/pkg/lightning/restore/tidb_test.go index 297839cab9370..2a2f746b8fc5e 100644 --- a/pkg/lightning/restore/tidb_test.go +++ b/pkg/lightning/restore/tidb_test.go @@ -101,6 +101,34 @@ func (s *tidbSuite) TestCreateTableIfNotExistsStmt(c *C) { []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE');"}, ) + // test clustered index consistency + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY CLUSTERED COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![clustered_index] CLUSTERED */ COMMENT 'CREATE TABLE');"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) COMMENT 'CREATE TABLE', PRIMARY KEY (`bar`) NONCLUSTERED);", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE',PRIMARY KEY(`bar`) /*T![clustered_index] NONCLUSTERED */);"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY /*T![clustered_index] NONCLUSTERED */ COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![clustered_index] NONCLUSTERED */ COMMENT 'CREATE TABLE');"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) COMMENT 'CREATE TABLE', PRIMARY KEY (`bar`) /*T![clustered_index] CLUSTERED */);", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE',PRIMARY KEY(`bar`) /*T![clustered_index] CLUSTERED */);"}, + ) + + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY AUTO_RANDOM(2) COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */ COMMENT 'CREATE TABLE');"}, + ) + // upper case becomes shorter c.Assert( createTableIfNotExistsStmt("CREATE TABLE `ſ`(`ı` TINYINT(1));", "ſ"), diff --git a/pkg/logutil/context.go b/pkg/logutil/context.go new file mode 100644 index 0000000000000..05fe7f0696b53 --- /dev/null +++ b/pkg/logutil/context.go @@ -0,0 +1,51 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package logutil + +import ( + "context" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// We cannot directly set global logger as log.L(), +// or when the global logger updated, we cannot get the latest logger. +var globalLogger *zap.Logger = nil + +// ResetGlobalLogger resets the global logger. +// Contexts have already made by `ContextWithField` would keep untouched, +// subsequent wrapping over those contexts would keep using the old global logger, +// only brand new contexts (i.e. context without logger) would be wrapped with the new global logger. +// This method is mainly for testing. +func ResetGlobalLogger(l *zap.Logger) { + globalLogger = l +} + +type loggingContextKey struct{} + +var keyLogger loggingContextKey = loggingContextKey{} + +// ContextWithField wrap a context with a logger with some fields. +func ContextWithField(c context.Context, fields ...zap.Field) context.Context { + logger := LoggerFromContext(c).With(fields...) + return context.WithValue(c, keyLogger, logger) +} + +// LoggerFromContext returns the contextual logger via the context. +// If there isn't a logger in the context, returns the global logger. +func LoggerFromContext(c context.Context) *zap.Logger { + logger, ok := c.Value(keyLogger).(*zap.Logger) + if !ok { + if globalLogger != nil { + return globalLogger + } + return log.L() + } + return logger +} + +// CL is the shorthand for LoggerFromContext. +func CL(c context.Context) *zap.Logger { + return LoggerFromContext(c) +} diff --git a/pkg/logutil/logging_test.go b/pkg/logutil/logging_test.go index 9c8f46275a6ac..ed9d45682997d 100644 --- a/pkg/logutil/logging_test.go +++ b/pkg/logutil/logging_test.go @@ -3,6 +3,7 @@ package logutil_test import ( + "context" "fmt" "math" "strings" @@ -19,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" ) func Test(t *testing.T) { @@ -207,3 +209,50 @@ func (s *testLoggingSuite) TestShortError(c *C) { assertTrimEqual(c, logutil.ShortError(err), `{"error": "test: [BR:Common:ErrInvalidArgument]invalid argument"}`) } + +type FieldEquals struct{} + +func (f FieldEquals) Info() *CheckerInfo { + return &CheckerInfo{ + Name: "FieldEquals", + Params: []string{ + "expected", + "actual", + }, + } +} + +func (f FieldEquals) Check(params []interface{}, names []string) (result bool, err string) { + expected := params[0].(zap.Field) + actual := params[1].(zap.Field) + + if !expected.Equals(actual) { + return false, "Field not match." + } + return true, "" +} + +func (s *testLoggingSuite) TestContextual(c *C) { + testCore, logs := observer.New(zap.InfoLevel) + logutil.ResetGlobalLogger(zap.New(testCore)) + + ctx := context.Background() + l0 := logutil.LoggerFromContext(ctx) + l0.Info("going to take an adventure?", zap.Int("HP", 50), zap.Int("HP-MAX", 50), zap.String("character", "solte")) + lctx := logutil.ContextWithField(ctx, zap.Strings("firends", []string{"firo", "seren", "black"})) + l := logutil.LoggerFromContext(lctx) + l.Info("let's go!", zap.String("character", "solte")) + + observedLogs := logs.TakeAll() + checkLog(c, observedLogs[0], + "going to take an adventure?", zap.Int("HP", 50), zap.Int("HP-MAX", 50), zap.String("character", "solte")) + checkLog(c, observedLogs[1], + "let's go!", zap.Strings("firends", []string{"firo", "seren", "black"}), zap.String("character", "solte")) +} + +func checkLog(c *C, actual observer.LoggedEntry, message string, fields ...zap.Field) { + c.Assert(message, Equals, actual.Message) + for i, f := range fields { + c.Assert(f, FieldEquals{}, actual.Context[i]) + } +} diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 28d0de26a7c61..83f5eef98af5c 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" + pdapi "github.com/tikv/pd/server/api" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -33,6 +34,7 @@ import ( const ( clusterVersionPrefix = "pd/api/v1/config/cluster-version" regionCountPrefix = "pd/api/v1/stats/region" + storePrefix = "pd/api/v1/store" schedulerPrefix = "pd/api/v1/schedulers" maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" @@ -335,6 +337,33 @@ func (p *PdController) getRegionCountWith( return 0, errors.Trace(err) } +// GetStoreInfo returns the info of store with the specified id. +func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdapi.StoreInfo, error) { + return p.getStoreInfoWith(ctx, pdRequest, storeID) +} + +func (p *PdController) getStoreInfoWith( + ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdapi.StoreInfo, error) { + var err error + for _, addr := range p.addrs { + query := fmt.Sprintf( + "%s/%d", + storePrefix, storeID) + v, e := get(ctx, addr, query, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + store := pdapi.StoreInfo{} + err = json.Unmarshal(v, &store) + if err != nil { + return nil, errors.Trace(err) + } + return &store, nil + } + return nil, errors.Trace(err) +} + func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { // pause this scheduler with 300 seconds body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index 76c2424e0dc3c..e4e82d412171c 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -18,6 +18,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/util/codec" + "github.com/tikv/pd/pkg/typeutil" + "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/statistics" ) @@ -200,3 +202,33 @@ func (s *testPDControllerSuite) TestPDRequestRetry(c *C) { _, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil) c.Assert(reqErr, NotNil) } + +func (s *testPDControllerSuite) TestStoreInfo(c *C) { + storeInfo := api.StoreInfo{ + Status: &api.StoreStatus{ + Capacity: typeutil.ByteSize(1024), + Available: typeutil.ByteSize(1024), + }, + Store: &api.MetaStore{ + StateName: "Tombstone", + }, + } + mock := func( + _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ io.Reader, + ) ([]byte, error) { + query := fmt.Sprintf("%s/%s", addr, prefix) + c.Assert(query, Equals, "http://mock/pd/api/v1/store/1") + ret, err := json.Marshal(storeInfo) + c.Assert(err, IsNil) + return ret, nil + } + + pdController := &PdController{addrs: []string{"http://mock"}} + ctx := context.Background() + resp, err := pdController.getStoreInfoWith(ctx, mock, 1) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) + c.Assert(resp.Status, NotNil) + c.Assert(resp.Store.StateName, Equals, "Tombstone") + c.Assert(uint64(resp.Status.Available), Equals, uint64(1024)) +} diff --git a/tests/lightning_generated_columns/data/gencol.expr_index-schema.sql b/tests/lightning_generated_columns/data/gencol.expr_index-schema.sql new file mode 100644 index 0000000000000..e248b9da61a9a --- /dev/null +++ b/tests/lightning_generated_columns/data/gencol.expr_index-schema.sql @@ -0,0 +1,11 @@ +-- https://github.com/pingcap/br/issues/1404 +-- expression indices just use a hidden virtual generated column behind the scene. + +CREATE TABLE `expr_index` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `a` varchar(20) DEFAULT NULL, + `b` varchar(20) DEFAULT NULL, + PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */, + KEY `idx_a` (`a`), + KEY `idx_lower_b` ((lower(`b`))) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=90003; diff --git a/tests/lightning_generated_columns/data/gencol.expr_index.0.sql b/tests/lightning_generated_columns/data/gencol.expr_index.0.sql new file mode 100644 index 0000000000000..996a0b5a4a74d --- /dev/null +++ b/tests/lightning_generated_columns/data/gencol.expr_index.0.sql @@ -0,0 +1 @@ +insert into expr_index (id, a, b) values (1, 'aaa', 'bbb'), (2, 'ABC', 'CDSFDS'); diff --git a/tests/lightning_generated_columns/data/gencol.nested-schema.sql b/tests/lightning_generated_columns/data/gencol.nested-schema.sql index 7d5fd1f615f95..180532ddc63c4 100644 --- a/tests/lightning_generated_columns/data/gencol.nested-schema.sql +++ b/tests/lightning_generated_columns/data/gencol.nested-schema.sql @@ -3,5 +3,6 @@ create table nested ( b int as (a + 1) virtual unique, c int as (b + 1) stored unique, d int as (c + 1) virtual unique, - e int as (d + 1) stored unique + e int as (d + 1) stored unique, + f int as (e + 1) virtual unique ); diff --git a/tests/lightning_generated_columns/data/gencol.virtual_only-schema.sql b/tests/lightning_generated_columns/data/gencol.virtual_only-schema.sql new file mode 100644 index 0000000000000..5735759c0745f --- /dev/null +++ b/tests/lightning_generated_columns/data/gencol.virtual_only-schema.sql @@ -0,0 +1,5 @@ +create table virtual_only ( + id int primary key, + id_plus_1 int as (id + 1) virtual, + id_plus_2 int as (id + 2) virtual +); diff --git a/tests/lightning_generated_columns/data/gencol.virtual_only.0.sql b/tests/lightning_generated_columns/data/gencol.virtual_only.0.sql new file mode 100644 index 0000000000000..8e951459b687c --- /dev/null +++ b/tests/lightning_generated_columns/data/gencol.virtual_only.0.sql @@ -0,0 +1 @@ +insert into virtual_only (id) values (30), (40); diff --git a/tests/lightning_generated_columns/run.sh b/tests/lightning_generated_columns/run.sh index 2e0ac3864488c..163e74fb6155e 100644 --- a/tests/lightning_generated_columns/run.sh +++ b/tests/lightning_generated_columns/run.sh @@ -43,12 +43,21 @@ for BACKEND in 'local' 'tidb' 'importer'; do run_lightning --backend $BACKEND + run_sql 'ADMIN CHECK TABLE gencol.nested' run_sql 'SELECT * FROM gencol.nested WHERE a = 100' check_contains 'a: 100' check_contains 'b: 101' check_contains 'c: 102' check_contains 'd: 103' check_contains 'e: 104' + check_contains 'f: 105' + run_sql 'SELECT * FROM gencol.nested WHERE f = 1005' + check_contains 'a: 1000' + check_contains 'b: 1001' + check_contains 'c: 1002' + check_contains 'd: 1003' + check_contains 'e: 1004' + check_contains 'f: 1005' run_sql 'SELECT * FROM gencol.various_types' --binary-as-hex check_contains 'int64: 3' @@ -68,4 +77,18 @@ for BACKEND in 'local' 'tidb' 'importer'; do # FIXME: test below disabled due to pingcap/tidb#21510 # check_contains 'week: 6' check_contains 'tz: 1969-12-31 16:00:01' + + run_sql 'ADMIN CHECK TABLE gencol.virtual_only' + run_sql 'SELECT * FROM gencol.virtual_only WHERE id = 30' + check_contains 'id_plus_1: 31' + check_contains 'id_plus_2: 32' + run_sql 'SELECT * FROM gencol.virtual_only WHERE id_plus_2 = 42' + check_contains 'id: 40' + check_contains 'id_plus_1: 41' + + run_sql 'ADMIN CHECK TABLE gencol.expr_index' + run_sql 'SELECT /*+ use_index(gencol.expr_index, idx_lower_b) */ * FROM gencol.expr_index WHERE lower(b) = "cdsfds"' + check_contains 'id: 2' + check_contains 'a: ABC' + check_contains 'b: CDSFDS' done