diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index ef7b74be9c707..865b2d2bef174 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -2468,8 +2468,1259 @@ func (local *local) MakeEmptyRows() kv.Rows { return kv.MakeRowsFromKvPairs(nil) } +<<<<<<< HEAD func (local *local) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) { return kv.NewTableKVEncoder(tbl, options) +======= +func (c *BackendConfig) adjust() { + c.MaxOpenFiles = mathutil.Max(c.MaxOpenFiles, openFilesLowerThreshold) +} + +// Backend is a local backend. +type Backend struct { + engines sync.Map // sync version of map[uuid.UUID]*Engine + + pdCtl *pdutil.PdController + splitCli split.SplitClient + tikvCli *tikvclient.KVStore + tls *common.TLS + regionSizeGetter TableRegionSizeGetter + tikvCodec tikvclient.Codec + + BackendConfig + + supportMultiIngest bool + duplicateDB *pebble.DB + keyAdapter KeyAdapter + importClientFactory ImportClientFactory + + bufferPool *membuf.Pool + metrics *metric.Metrics + writeLimiter StoreWriteLimiter + logger log.Logger +} + +var _ DiskUsage = (*Backend)(nil) +var _ backend.Backend = (*Backend)(nil) + +func openDuplicateDB(storeDir string) (*pebble.DB, error) { + dbPath := filepath.Join(storeDir, duplicateDBName) + // TODO: Optimize the opts for better write. + opts := &pebble.Options{ + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newRangePropertiesCollector, + }, + } + return pebble.Open(dbPath, opts) +} + +var ( + // RunInTest indicates whether the current process is running in test. + RunInTest bool + // LastAlloc is the last ID allocator. + LastAlloc manual.Allocator +) + +// NewBackend creates new connections to tikv. +func NewBackend( + ctx context.Context, + tls *common.TLS, + config BackendConfig, + regionSizeGetter TableRegionSizeGetter, +) (*Backend, error) { + config.adjust() + pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) + if err != nil { + return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) + } + splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false) + + shouldCreate := true + if config.CheckpointEnabled { + if info, err := os.Stat(config.LocalStoreDir); err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } else if info.IsDir() { + shouldCreate = false + } + } + + if shouldCreate { + err = os.Mkdir(config.LocalStoreDir, 0o700) + if err != nil { + return nil, common.ErrInvalidSortedKVDir.Wrap(err).GenWithStackByArgs(config.LocalStoreDir) + } + } + + var duplicateDB *pebble.DB + if config.DupeDetectEnabled { + duplicateDB, err = openDuplicateDB(config.LocalStoreDir) + if err != nil { + return nil, common.ErrOpenDuplicateDB.Wrap(err).GenWithStackByArgs() + } + } + + // The following copies tikv.NewTxnClient without creating yet another pdClient. + spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) + if err != nil { + return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() + } + + var pdCliForTiKV *tikvclient.CodecPDClient + if config.KeyspaceName == "" { + pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + } else { + pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName) + if err != nil { + return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() + } + } + + tikvCodec := pdCliForTiKV.GetCodec() + rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) + tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) + if err != nil { + return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() + } + importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) + keyAdapter := KeyAdapter(noopKeyAdapter{}) + if config.DupeDetectEnabled { + keyAdapter = dupDetectKeyAdapter{} + } + var writeLimiter StoreWriteLimiter + if config.StoreWriteBWLimit > 0 { + writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit) + } else { + writeLimiter = noopStoreWriteLimiter{} + } + alloc := manual.Allocator{} + if RunInTest { + alloc.RefCnt = new(atomic.Int64) + LastAlloc = alloc + } + local := &Backend{ + engines: sync.Map{}, + pdCtl: pdCtl, + splitCli: splitCli, + tikvCli: tikvCli, + tls: tls, + regionSizeGetter: regionSizeGetter, + tikvCodec: tikvCodec, + + BackendConfig: config, + + duplicateDB: duplicateDB, + keyAdapter: keyAdapter, + importClientFactory: importClientFactory, + bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), + writeLimiter: writeLimiter, + logger: log.FromContext(ctx), + } + if m, ok := metric.FromContext(ctx); ok { + local.metrics = m + } + if err = local.checkMultiIngestSupport(ctx); err != nil { + return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() + } + + return local, nil +} + +// TotalMemoryConsume returns the total memory usage of the local backend. +func (local *Backend) TotalMemoryConsume() int64 { + var memConsume int64 + local.engines.Range(func(k, v interface{}) bool { + e := v.(*Engine) + if e != nil { + memConsume += e.TotalMemorySize() + } + return true + }) + return memConsume + local.bufferPool.TotalSize() +} + +func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { + stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return errors.Trace(err) + } + + hasTiFlash := false + for _, s := range stores { + if s.State == metapb.StoreState_Up && engine.IsTiFlash(s) { + hasTiFlash = true + break + } + } + + for _, s := range stores { + // skip stores that are not online + if s.State != metapb.StoreState_Up || engine.IsTiFlash(s) { + continue + } + var err error + for i := 0; i < maxRetryTimes; i++ { + if i > 0 { + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + client, err1 := local.getImportClient(ctx, s.Id) + if err1 != nil { + err = err1 + log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address)) + continue + } + _, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err == nil { + break + } + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unimplemented { + log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s)) + local.supportMultiIngest = false + return nil + } + } + log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address), + zap.Int("retry", i)) + } + if err != nil { + // if the cluster contains no TiFlash store, we don't need the multi-ingest feature, + // so in this condition, downgrade the logic instead of return an error. + if hasTiFlash { + return errors.Trace(err) + } + log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err)) + local.supportMultiIngest = false + return nil + } + } + + local.supportMultiIngest = true + log.FromContext(ctx).Info("multi ingest support") + return nil +} + +// rlock read locks a local file and returns the Engine instance if it exists. +func (local *Backend) rLockEngine(engineID uuid.UUID) *Engine { + if e, ok := local.engines.Load(engineID); ok { + engine := e.(*Engine) + engine.rLock() + return engine + } + return nil +} + +// lock locks a local file and returns the Engine instance if it exists. +func (local *Backend) lockEngine(engineID uuid.UUID, state importMutexState) *Engine { + if e, ok := local.engines.Load(engineID); ok { + engine := e.(*Engine) + engine.lock(state) + return engine + } + return nil +} + +// tryRLockAllEngines tries to read lock all engines, return all `Engine`s that are successfully locked. +func (local *Backend) tryRLockAllEngines() []*Engine { + var allEngines []*Engine + local.engines.Range(func(k, v interface{}) bool { + engine := v.(*Engine) + // skip closed engine + if engine.tryRLock() { + if !engine.closed.Load() { + allEngines = append(allEngines, engine) + } else { + engine.rUnlock() + } + } + return true + }) + return allEngines +} + +// lockAllEnginesUnless tries to lock all engines, unless those which are already locked in the +// state given by ignoreStateMask. Returns the list of locked engines. +func (local *Backend) lockAllEnginesUnless(newState, ignoreStateMask importMutexState) []*Engine { + var allEngines []*Engine + local.engines.Range(func(k, v interface{}) bool { + engine := v.(*Engine) + if engine.lockUnless(newState, ignoreStateMask) { + allEngines = append(allEngines, engine) + } + return true + }) + return allEngines +} + +// Close the local backend. +func (local *Backend) Close() { + allEngines := local.lockAllEnginesUnless(importMutexStateClose, 0) + local.engines = sync.Map{} + + for _, engine := range allEngines { + _ = engine.Close() + engine.unlock() + } + + local.importClientFactory.Close() + local.bufferPool.Destroy() + + if local.duplicateDB != nil { + // Check if there are duplicates that are not collected. + iter := local.duplicateDB.NewIter(&pebble.IterOptions{}) + hasDuplicates := iter.First() + allIsWell := true + if err := iter.Error(); err != nil { + local.logger.Warn("iterate duplicate db failed", zap.Error(err)) + allIsWell = false + } + if err := iter.Close(); err != nil { + local.logger.Warn("close duplicate db iter failed", zap.Error(err)) + allIsWell = false + } + if err := local.duplicateDB.Close(); err != nil { + local.logger.Warn("close duplicate db failed", zap.Error(err)) + allIsWell = false + } + // If checkpoint is disabled, or we don't detect any duplicate, then this duplicate + // db dir will be useless, so we clean up this dir. + if allIsWell && (!local.CheckpointEnabled || !hasDuplicates) { + if err := os.RemoveAll(filepath.Join(local.LocalStoreDir, duplicateDBName)); err != nil { + local.logger.Warn("remove duplicate db file failed", zap.Error(err)) + } + } + local.duplicateDB = nil + } + + // if checkpoint is disable or we finish load all data successfully, then files in this + // dir will be useless, so we clean up this dir and all files in it. + if !local.CheckpointEnabled || common.IsEmptyDir(local.LocalStoreDir) { + err := os.RemoveAll(local.LocalStoreDir) + if err != nil { + local.logger.Warn("remove local db file failed", zap.Error(err)) + } + } + _ = local.tikvCli.Close() + local.pdCtl.Close() +} + +// FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart +func (local *Backend) FlushEngine(ctx context.Context, engineID uuid.UUID) error { + engine := local.rLockEngine(engineID) + + // the engine cannot be deleted after while we've acquired the lock identified by UUID. + if engine == nil { + return errors.Errorf("engine '%s' not found", engineID) + } + defer engine.rUnlock() + if engine.closed.Load() { + return nil + } + return engine.flushEngineWithoutLock(ctx) +} + +// FlushAllEngines flush all engines. +func (local *Backend) FlushAllEngines(parentCtx context.Context) (err error) { + allEngines := local.tryRLockAllEngines() + defer func() { + for _, engine := range allEngines { + engine.rUnlock() + } + }() + + eg, ctx := errgroup.WithContext(parentCtx) + for _, engine := range allEngines { + e := engine + eg.Go(func() error { + return e.flushEngineWithoutLock(ctx) + }) + } + return eg.Wait() +} + +// RetryImportDelay returns the delay time before retrying to import a file. +func (*Backend) RetryImportDelay() time.Duration { + return defaultRetryBackoffTime +} + +// ShouldPostProcess returns true if the backend should post process the data. +func (*Backend) ShouldPostProcess() bool { + return true +} + +func (local *Backend) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.DB, error) { + opt := &pebble.Options{ + MemTableSize: local.MemTableSize, + // the default threshold value may cause write stall. + MemTableStopWritesThreshold: 8, + MaxConcurrentCompactions: 16, + // set threshold to half of the max open files to avoid trigger compaction + L0CompactionThreshold: math.MaxInt32, + L0StopWritesThreshold: math.MaxInt32, + LBaseMaxBytes: 16 * units.TiB, + MaxOpenFiles: local.MaxOpenFiles, + DisableWAL: true, + ReadOnly: readOnly, + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newRangePropertiesCollector, + }, + } + // set level target file size to avoid pebble auto triggering compaction that split ingest SST files into small SST. + opt.Levels = []pebble.LevelOptions{ + { + TargetFileSize: 16 * units.GiB, + }, + } + + dbPath := filepath.Join(local.LocalStoreDir, engineUUID.String()) + db, err := pebble.Open(dbPath, opt) + return db, errors.Trace(err) +} + +// OpenEngine must be called with holding mutex of Engine. +func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { + db, err := local.openEngineDB(engineUUID, false) + if err != nil { + return err + } + + sstDir := engineSSTDir(local.LocalStoreDir, engineUUID) + if !cfg.KeepSortDir { + if err := os.RemoveAll(sstDir); err != nil { + return errors.Trace(err) + } + } + if !common.IsDirExists(sstDir) { + if err := os.Mkdir(sstDir, 0o750); err != nil { + return errors.Trace(err) + } + } + engineCtx, cancel := context.WithCancel(ctx) + + e, _ := local.engines.LoadOrStore(engineUUID, &Engine{ + UUID: engineUUID, + sstDir: sstDir, + sstMetasChan: make(chan metaOrFlush, 64), + ctx: engineCtx, + cancel: cancel, + config: cfg.Local, + tableInfo: cfg.TableInfo, + duplicateDetection: local.DupeDetectEnabled, + dupDetectOpt: local.DuplicateDetectOpt, + duplicateDB: local.duplicateDB, + keyAdapter: local.keyAdapter, + logger: log.FromContext(ctx), + }) + engine := e.(*Engine) + engine.lock(importMutexStateOpen) + defer engine.unlock() + engine.db.Store(db) + engine.sstIngester = dbSSTIngester{e: engine} + if err = engine.loadEngineMeta(); err != nil { + return errors.Trace(err) + } + if err = local.allocateTSIfNotExists(ctx, engine); err != nil { + return errors.Trace(err) + } + engine.wg.Add(1) + go engine.ingestSSTLoop() + return nil +} + +func (local *Backend) allocateTSIfNotExists(ctx context.Context, engine *Engine) error { + if engine.TS > 0 { + return nil + } + physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx) + if err != nil { + return err + } + ts := oracle.ComposeTS(physical, logical) + engine.TS = ts + return engine.saveEngineMeta() +} + +// CloseEngine closes backend engine by uuid. +func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error { + // flush mem table to storage, to free memory, + // ask others' advise, looks like unnecessary, but with this we can control memory precisely. + engineI, ok := local.engines.Load(engineUUID) + if !ok { + // recovery mode, we should reopen this engine file + db, err := local.openEngineDB(engineUUID, true) + if err != nil { + return err + } + engine := &Engine{ + UUID: engineUUID, + sstMetasChan: make(chan metaOrFlush), + tableInfo: cfg.TableInfo, + keyAdapter: local.keyAdapter, + duplicateDetection: local.DupeDetectEnabled, + dupDetectOpt: local.DuplicateDetectOpt, + duplicateDB: local.duplicateDB, + logger: log.FromContext(ctx), + } + engine.db.Store(db) + engine.sstIngester = dbSSTIngester{e: engine} + if err = engine.loadEngineMeta(); err != nil { + return err + } + local.engines.Store(engineUUID, engine) + return nil + } + + engine := engineI.(*Engine) + engine.rLock() + if engine.closed.Load() { + engine.rUnlock() + return nil + } + + err := engine.flushEngineWithoutLock(ctx) + engine.rUnlock() + + // use mutex to make sure we won't close sstMetasChan while other routines + // trying to do flush. + engine.lock(importMutexStateClose) + engine.closed.Store(true) + close(engine.sstMetasChan) + engine.unlock() + if err != nil { + return errors.Trace(err) + } + engine.wg.Wait() + return engine.ingestErr.Get() +} + +func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { + return local.importClientFactory.Create(ctx, storeID) +} + +func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { + ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit)) + curSize := uint64(0) + curKeys := uint64(0) + curKey := fullRange.start + + sizeProps.iter(func(p *rangeProperty) bool { + if bytes.Compare(p.Key, curKey) <= 0 { + return true + } + if bytes.Compare(p.Key, fullRange.end) > 0 { + return false + } + curSize += p.Size + curKeys += p.Keys + if int64(curSize) >= sizeLimit || int64(curKeys) >= keysLimit { + ranges = append(ranges, Range{start: curKey, end: p.Key}) + curKey = p.Key + curSize = 0 + curKeys = 0 + } + return true + }) + + if bytes.Compare(curKey, fullRange.end) < 0 { + // If the remaining range is too small, append it to last range. + if len(ranges) > 0 && curKeys == 0 { + ranges[len(ranges)-1].end = fullRange.end + } else { + ranges = append(ranges, Range{start: curKey, end: fullRange.end}) + } + } + return ranges +} + +func (local *Backend) readAndSplitIntoRange( + ctx context.Context, + engine *Engine, + sizeLimit int64, + keysLimit int64, +) ([]Range, error) { + firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil) + if err != nil { + return nil, err + } + if firstKey == nil { + return nil, errors.New("could not find first pair") + } + + endKey := nextKey(lastKey) + + engineFileTotalSize := engine.TotalSize.Load() + engineFileLength := engine.Length.Load() + + if engineFileTotalSize <= sizeLimit && engineFileLength <= keysLimit { + ranges := []Range{{start: firstKey, end: endKey}} + return ranges, nil + } + + logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID)) + sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) + if err != nil { + return nil, errors.Trace(err) + } + + ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps, + sizeLimit, keysLimit) + + logger.Info("split engine key ranges", + zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength), + logutil.Key("firstKey", firstKey), logutil.Key("lastKey", lastKey), + zap.Int("ranges", len(ranges))) + + return ranges, nil +} + +// prepareAndSendJob will read the engine to get estimated key range, +// then split and scatter regions for these range and send region jobs to jobToWorkerCh. +// NOTE when ctx is Done, this function will NOT return error even if it hasn't sent +// all the jobs to jobToWorkerCh. This is because the "first error" can only be +// found by checking the work group LATER, we don't want to return an error to +// seize the "first" error. +func (local *Backend) prepareAndSendJob( + ctx context.Context, + engine *Engine, + initialSplitRanges []Range, + regionSplitSize, regionSplitKeys int64, + jobToWorkerCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) error { + lfTotalSize := engine.TotalSize.Load() + lfLength := engine.Length.Load() + log.FromContext(ctx).Info("import engine ranges", zap.Int("count", len(initialSplitRanges))) + if len(initialSplitRanges) == 0 { + return nil + } + + // if all the kv can fit in one region, skip split regions. TiDB will split one region for + // the table when table is created. + needSplit := len(initialSplitRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys + var err error + // split region by given ranges + failpoint.Inject("failToSplit", func(_ failpoint.Value) { + needSplit = true + }) + logger := log.FromContext(ctx).With(zap.Stringer("uuid", engine.UUID)).Begin(zap.InfoLevel, "split and scatter ranges") + for i := 0; i < maxRetryTimes; i++ { + failpoint.Inject("skipSplitAndScatter", func() { + failpoint.Break() + }) + + err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges) + if err == nil || common.IsContextCanceledError(err) { + break + } + + log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engine.UUID), + log.ShortError(err), zap.Int("retry", i)) + } + logger.End(zap.ErrorLevel, err) + if err != nil { + return err + } + + return local.generateAndSendJob( + ctx, + engine, + initialSplitRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + jobWg, + ) +} + +// generateAndSendJob scans the region in ranges and send region jobs to jobToWorkerCh. +func (local *Backend) generateAndSendJob( + ctx context.Context, + engine *Engine, + jobRanges []Range, + regionSplitSize, regionSplitKeys int64, + jobToWorkerCh chan<- *regionJob, + jobWg *sync.WaitGroup, +) error { + logger := log.FromContext(ctx) + + // when use dynamic region feature, the region may be very big, we need + // to split to smaller ranges to increase the concurrency. + if regionSplitSize > 2*int64(config.SplitRegionSize) { + sizeProps, err := getSizePropertiesFn(logger, engine.getDB(), local.keyAdapter) + if err != nil { + return errors.Trace(err) + } + + jobRanges = splitRangeBySizeProps( + Range{start: jobRanges[0].start, end: jobRanges[len(jobRanges)-1].end}, + sizeProps, + int64(config.SplitRegionSize), + int64(config.SplitRegionKeys)) + } + logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges))) + + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(local.WorkerConcurrency) + for _, jobRange := range jobRanges { + r := jobRange + eg.Go(func() error { + if egCtx.Err() != nil { + return nil + } + + failpoint.Inject("beforeGenerateJob", nil) + jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys) + if err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return err + } + for _, job := range jobs { + jobWg.Add(1) + select { + case <-egCtx.Done(): + // this job is not put into jobToWorkerCh + jobWg.Done() + // if the context is canceled, it means worker has error, the first error can be + // found by worker's error group LATER. if this function returns an error it will + // seize the "first error". + return nil + case jobToWorkerCh <- job: + } + } + return nil + }) + } + return eg.Wait() +} + +// fakeRegionJobs is used in test, the injected job can be found by (startKey, endKey). +var fakeRegionJobs map[[2]string]struct { + jobs []*regionJob + err error +} + +// generateJobForRange will scan the region in `keyRange` and generate region jobs. +// It will retry internally when scan region meet error. +func (local *Backend) generateJobForRange( + ctx context.Context, + engine ingestData, + keyRange Range, + regionSplitSize, regionSplitKeys int64, +) ([]*regionJob, error) { + failpoint.Inject("fakeRegionJobs", func() { + if ctx.Err() != nil { + failpoint.Return(nil, ctx.Err()) + } + key := [2]string{string(keyRange.start), string(keyRange.end)} + injected := fakeRegionJobs[key] + // overwrite the stage to regionScanned, because some time same keyRange + // will be generated more than once. + for _, job := range injected.jobs { + job.stage = regionScanned + } + failpoint.Return(injected.jobs, injected.err) + }) + + start, end := keyRange.start, keyRange.end + pairStart, pairEnd, err := engine.GetFirstAndLastKey(start, end) + if err != nil { + return nil, err + } + if pairStart == nil { + log.FromContext(ctx).Info("There is no pairs in range", + logutil.Key("start", start), + logutil.Key("end", end)) + return nil, nil + } + + startKey := codec.EncodeBytes([]byte{}, pairStart) + endKey := codec.EncodeBytes([]byte{}, nextKey(pairEnd)) + regions, err := split.PaginateScanRegion(ctx, local.splitCli, startKey, endKey, scanRegionLimit) + if err != nil { + log.FromContext(ctx).Error("scan region failed", + log.ShortError(err), zap.Int("region_len", len(regions)), + logutil.Key("startKey", startKey), + logutil.Key("endKey", endKey)) + return nil, err + } + + jobs := make([]*regionJob, 0, len(regions)) + for _, region := range regions { + log.FromContext(ctx).Debug("get region", + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Uint64("id", region.Region.GetId()), + zap.Stringer("epoch", region.Region.GetRegionEpoch()), + zap.Binary("start", region.Region.GetStartKey()), + zap.Binary("end", region.Region.GetEndKey()), + zap.Reflect("peers", region.Region.GetPeers())) + + jobs = append(jobs, ®ionJob{ + keyRange: intersectRange(region.Region, Range{start: start, end: end}), + region: region, + stage: regionScanned, + ingestData: engine, + regionSplitSize: regionSplitSize, + regionSplitKeys: regionSplitKeys, + metrics: local.metrics, + }) + } + return jobs, nil +} + +// startWorker creates a worker that reads from the job channel and processes. +// startWorker will return nil if it's expected to stop, where the only case is +// the context canceled. It will return not nil error when it actively stops. +// startWorker must Done the jobWg if it does not put the job into jobOutCh. +func (local *Backend) startWorker( + ctx context.Context, + jobInCh, jobOutCh chan *regionJob, + jobWg *sync.WaitGroup, +) error { + for { + select { + case <-ctx.Done(): + return nil + case job, ok := <-jobInCh: + if !ok { + // In fact we don't use close input channel to notify worker to + // exit, because there's a cycle in workflow. + return nil + } + + err := local.executeJob(ctx, job) + switch job.stage { + case regionScanned, wrote, ingested: + jobOutCh <- job + case needRescan: + jobs, err2 := local.generateJobForRange( + ctx, + job.ingestData, + job.keyRange, + job.regionSplitSize, + job.regionSplitKeys, + ) + if err2 != nil { + // Don't need to put the job back to retry, because generateJobForRange + // has done the retry internally. Here just done for the "needRescan" + // job and exit directly. + jobWg.Done() + return err2 + } + // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. + jobWg.Add(len(jobs) - 1) + for _, j := range jobs { + j.lastRetryableErr = job.lastRetryableErr + jobOutCh <- j + } + } + + if err != nil { + return err + } + } + } +} + +func (*Backend) isRetryableImportTiKVError(err error) bool { + err = errors.Cause(err) + // io.EOF is not retryable in normal case + // but on TiKV restart, if we're writing to TiKV(through GRPC) + // it might return io.EOF(it's GRPC Unavailable in most case), + // we need to retry on this error. + // see SendMsg in https://pkg.go.dev/google.golang.org/grpc#ClientStream + if err == io.EOF { + return true + } + return common.IsRetryableError(err) +} + +// executeJob handles a regionJob and tries to convert it to ingested stage. +// If non-retryable error occurs, it will return the error. +// If retryable error occurs, it will return nil and caller should check the stage +// of the regionJob to determine what to do with it. +func (local *Backend) executeJob( + ctx context.Context, + job *regionJob, +) error { + failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) { + failpoint.Return( + errors.New("the remaining storage capacity of TiKV is less than 10%%; please increase the storage capacity of TiKV and try again")) + }) + if local.ShouldCheckTiKV { + for _, peer := range job.region.Region.GetPeers() { + var ( + store *pdtypes.StoreInfo + err error + ) + for i := 0; i < maxRetryTimes; i++ { + store, err = local.pdCtl.GetStoreInfo(ctx, peer.StoreId) + if err != nil { + continue + } + if store.Status.Capacity > 0 { + // The available disk percent of TiKV + ratio := store.Status.Available * 100 / store.Status.Capacity + if ratio < 10 { + return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address) + } + } + break + } + if err != nil { + log.FromContext(ctx).Error("failed to get StoreInfo from pd http api", zap.Error(err)) + } + } + } + + for { + err := local.writeToTiKV(ctx, job) + if err != nil { + if !local.isRetryableImportTiKVError(err) { + return err + } + // if it's retryable error, we retry from scanning region + log.FromContext(ctx).Warn("meet retryable error when writing to TiKV", + log.ShortError(err), zap.Stringer("job stage", job.stage)) + job.convertStageTo(needRescan) + job.lastRetryableErr = err + return nil + } + + err = local.ingest(ctx, job) + if err != nil { + if !local.isRetryableImportTiKVError(err) { + return err + } + log.FromContext(ctx).Warn("meet retryable error when ingesting", + log.ShortError(err), zap.Stringer("job stage", job.stage)) + job.lastRetryableErr = err + return nil + } + // if the job.stage successfully converted into "ingested", it means + // these data are ingested into TiKV so we handle remaining data. + // For other job.stage, the job should be sent back to caller to retry + // later. + if job.stage != ingested { + return nil + } + + if job.writeResult == nil || job.writeResult.remainingStartKey == nil { + return nil + } + job.keyRange.start = job.writeResult.remainingStartKey + job.convertStageTo(regionScanned) + } +} + +// ImportEngine imports an engine to TiKV. +func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { + lf := local.lockEngine(engineUUID, importMutexStateImport) + if lf == nil { + // skip if engine not exist. See the comment of `CloseEngine` for more detail. + return nil + } + defer lf.unlock() + + lfTotalSize := lf.TotalSize.Load() + lfLength := lf.Length.Load() + if lfTotalSize == 0 { + // engine is empty, this is likes because it's a index engine but the table contains no index + log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) + return nil + } + kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + if err == nil { + if kvRegionSplitSize > regionSplitSize { + regionSplitSize = kvRegionSplitSize + } + if kvRegionSplitKeys > regionSplitKeys { + regionSplitKeys = kvRegionSplitKeys + } + } else { + log.FromContext(ctx).Warn("fail to get region split keys and size", zap.Error(err)) + } + + // split sorted file into range about regionSplitSize per file + regionRanges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys) + if err != nil { + return err + } + + if len(regionRanges) > 0 && local.PausePDSchedulerScope == config.PausePDSchedulerScopeTable { + log.FromContext(ctx).Info("pause pd scheduler of table scope") + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var startKey, endKey []byte + if len(regionRanges[0].start) > 0 { + startKey = codec.EncodeBytes(nil, regionRanges[0].start) + } + if len(regionRanges[len(regionRanges)-1].end) > 0 { + endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].end) + } + done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + defer func() { + cancel() + <-done + }() + } + + if len(regionRanges) > 0 && local.BackendConfig.RaftKV2SwitchModeDuration > 0 { + log.FromContext(ctx).Info("switch import mode of ranges", + zap.String("startKey", hex.EncodeToString(regionRanges[0].start)), + zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].end))) + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done, err := local.SwitchModeByKeyRanges(subCtx, regionRanges) + if err != nil { + return errors.Trace(err) + } + defer func() { + cancel() + <-done + }() + } + + log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID), + zap.Int("region ranges", len(regionRanges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) + + failpoint.Inject("ReadyForImportEngine", func() {}) + + err = local.doImport(ctx, lf, regionRanges, regionSplitSize, regionSplitKeys) + if err == nil { + log.FromContext(ctx).Info("import engine success", zap.Stringer("uuid", engineUUID), + zap.Int64("size", lfTotalSize), zap.Int64("kvs", lfLength), + zap.Int64("importedSize", lf.importedKVSize.Load()), zap.Int64("importedCount", lf.importedKVCount.Load())) + } + return err +} + +func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges []Range, regionSplitSize, regionSplitKeys int64) error { + /* + [prepareAndSendJob]-----jobToWorkerCh--->[workers] + ^ | + | jobFromWorkerCh + | | + | v + [regionJobRetryer]<--[dispatchJobGoroutine]-->done + */ + + var ( + ctx2, workerCancel = context.WithCancel(ctx) + // workerCtx.Done() means workflow is canceled by error. It may be caused + // by calling workerCancel() or workers in workGroup meets error. + workGroup, workerCtx = errgroup.WithContext(ctx2) + firstErr common.OnceError + // jobToWorkerCh and jobFromWorkerCh are unbuffered so jobs will not be + // owned by them. + jobToWorkerCh = make(chan *regionJob) + jobFromWorkerCh = make(chan *regionJob) + // jobWg tracks the number of jobs in this workflow. + // prepareAndSendJob, workers and regionJobRetryer can own jobs. + // When cancel on error, the goroutine of above three components have + // responsibility to Done jobWg of their owning jobs. + jobWg sync.WaitGroup + dispatchJobGoroutine = make(chan struct{}) + ) + defer workerCancel() + + retryer := startRegionJobRetryer(workerCtx, jobToWorkerCh, &jobWg) + + // dispatchJobGoroutine handles processed job from worker, it will only exit + // when jobFromWorkerCh is closed to avoid worker is blocked on sending to + // jobFromWorkerCh. + defer func() { + // use defer to close jobFromWorkerCh after all workers are exited + close(jobFromWorkerCh) + <-dispatchJobGoroutine + }() + go func() { + defer close(dispatchJobGoroutine) + for { + job, ok := <-jobFromWorkerCh + if !ok { + return + } + switch job.stage { + case regionScanned, wrote: + job.retryCount++ + if job.retryCount > maxWriteAndIngestRetryTimes { + firstErr.Set(job.lastRetryableErr) + workerCancel() + jobWg.Done() + continue + } + // max retry backoff time: 2+4+8+16+30*26=810s + sleepSecond := math.Pow(2, float64(job.retryCount)) + if sleepSecond > float64(maxRetryBackoffSecond) { + sleepSecond = float64(maxRetryBackoffSecond) + } + job.waitUntil = time.Now().Add(time.Second * time.Duration(sleepSecond)) + log.FromContext(ctx).Info("put job back to jobCh to retry later", + logutil.Key("startKey", job.keyRange.start), + logutil.Key("endKey", job.keyRange.end), + zap.Stringer("stage", job.stage), + zap.Int("retryCount", job.retryCount), + zap.Time("waitUntil", job.waitUntil)) + if !retryer.push(job) { + // retryer is closed by worker error + jobWg.Done() + } + case ingested: + jobWg.Done() + case needRescan: + panic("should not reach here") + } + } + }() + + for i := 0; i < local.WorkerConcurrency; i++ { + workGroup.Go(func() error { + return local.startWorker(workerCtx, jobToWorkerCh, jobFromWorkerCh, &jobWg) + }) + } + + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + firstErr.Set(err) + workerCancel() + _ = workGroup.Wait() + return firstErr.Get() + } + + jobWg.Wait() + workerCancel() + firstErr.Set(workGroup.Wait()) + firstErr.Set(ctx.Err()) + return firstErr.Get() +} + +// GetImportedKVCount returns the number of imported KV pairs of some engine. +func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 { + v, ok := local.engines.Load(engineUUID) + if !ok { + // we get it after import, but before clean up, so this should not happen + // todo: return error + return 0 + } + e := v.(*Engine) + return e.importedKVCount.Load() +} + +// ResetEngine reset the engine and reclaim the space. +func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { + // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 + localEngine := local.lockEngine(engineUUID, importMutexStateClose) + if localEngine == nil { + log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) + return nil + } + defer localEngine.unlock() + if err := localEngine.Close(); err != nil { + return err + } + if err := localEngine.Cleanup(local.LocalStoreDir); err != nil { + return err + } + db, err := local.openEngineDB(engineUUID, false) + if err == nil { + localEngine.db.Store(db) + localEngine.engineMeta = engineMeta{} + if !common.IsDirExists(localEngine.sstDir) { + if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil { + return errors.Trace(err) + } + } + if err = local.allocateTSIfNotExists(ctx, localEngine); err != nil { + return errors.Trace(err) + } + } + localEngine.pendingFileSize.Store(0) + + return err +} + +// CleanupEngine cleanup the engine and reclaim the space. +func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { + localEngine := local.lockEngine(engineUUID, importMutexStateClose) + // release this engine after import success + if localEngine == nil { + log.FromContext(ctx).Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) + return nil + } + defer localEngine.unlock() + + // since closing the engine causes all subsequent operations on it panic, + // we make sure to delete it from the engine map before calling Close(). + // (note that Close() returning error does _not_ mean the pebble DB + // remains open/usable.) + local.engines.Delete(engineUUID) + err := localEngine.Close() + if err != nil { + return err + } + err = localEngine.Cleanup(local.LocalStoreDir) + if err != nil { + return err + } + localEngine.TotalSize.Store(0) + localEngine.Length.Store(0) + return nil +} + +// GetDupeController returns a new dupe controller. +func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController { + return &DupeController{ + splitCli: local.splitCli, + tikvCli: local.tikvCli, + tikvCodec: local.tikvCodec, + errorMgr: errorMgr, + dupeConcurrency: dupeConcurrency, + duplicateDB: local.duplicateDB, + keyAdapter: local.keyAdapter, + importClientFactory: local.importClientFactory, + resourceGroupName: local.ResourceGroupName, + taskType: local.TaskType, + } +} + +// UnsafeImportAndReset forces the backend to import the content of an engine +// into the target and then reset the engine to empty. This method will not +// close the engine. Make sure the engine is flushed manually before calling +// this method. +func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { + // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after + // calling UnsafeImportAndReset(). + logger := log.FromContext(ctx).With( + zap.String("engineTag", ""), + zap.Stringer("engineUUID", engineUUID), + ) + closedEngine := backend.NewClosedEngine(local, logger, engineUUID, 0) + if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { + return err + } + return local.ResetEngine(ctx, engineUUID) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) } func engineSSTDir(storeDir string, engineUUID uuid.UUID) string { diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 92c8824c7dc91..61243c36175ec 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -63,9 +63,7 @@ var ( func (local *local) SplitAndScatterRegionInBatches( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, - regionSplitSize int64, batchCnt int, ) error { for i := 0; i < len(ranges); i += batchCnt { @@ -73,7 +71,7 @@ func (local *local) SplitAndScatterRegionInBatches( if len(batch) > batchCnt { batch = batch[:batchCnt] } - if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil { + if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil { return errors.Trace(err) } } @@ -87,10 +85,13 @@ func (local *local) SplitAndScatterRegionInBatches( func (local *local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, +<<<<<<< HEAD regionSplitSize int64, ) error { +======= +) (err error) { +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) if len(ranges) == 0 { return nil } @@ -106,9 +107,14 @@ func (local *local) SplitAndScatterRegionByRanges( scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte waitTime := splitRegionBaseBackOffTime +<<<<<<< HEAD skippedKeys := 0 for i := 0; i < SplitRetryTimes; i++ { log.L().Info("split and scatter region", +======= + for i := 0; i < splitRetryTimes; i++ { + log.FromContext(ctx).Info("split and scatter region", +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) logutil.Key("minKey", minKey), logutil.Key("maxKey", maxKey), zap.Int("retry", i), @@ -168,6 +174,7 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } +<<<<<<< HEAD var tableRegionStats map[uint64]int64 if tableInfo != nil { tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID) @@ -178,6 +185,8 @@ func (local *local) SplitAndScatterRegionByRanges( } } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) regionMap := make(map[uint64]*split.RegionInfo) for _, region := range regions { regionMap[region.Region.GetId()] = region @@ -287,6 +296,7 @@ func (local *local) SplitAndScatterRegionByRanges( } sendLoop: for regionID, keys := range splitKeyMap { +<<<<<<< HEAD // if region not in tableRegionStats, that means this region is newly split, so // we can skip split it again. regionSize, ok := tableRegionStats[regionID] @@ -296,6 +306,8 @@ func (local *local) SplitAndScatterRegionByRanges( if len(keys) == 1 && regionSize < regionSplitSize { skippedKeys++ } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) select { case ch <- &splitInfo{region: regionMap[regionID], keys: keys}: case <-ctx.Done(): @@ -338,12 +350,19 @@ func (local *local) SplitAndScatterRegionByRanges( scatterCount++ } if scatterCount == len(scatterRegions) { +<<<<<<< HEAD log.L().Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { log.L().Info("waiting for scattering regions timeout", zap.Int("skipped_keys", skippedKeys), +======= + log.FromContext(ctx).Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + } else { + log.FromContext(ctx).Info("waiting for scattering regions timeout", +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d2bca60924d0f..9f05a721160e9 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -426,11 +426,18 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h start = end } +<<<<<<< HEAD err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) if len(errPat) == 0 { c.Assert(err, IsNil) } else { c.Assert(err, ErrorMatches, errPat) +======= + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + if len(errPat) != 0 { + require.Error(t, err) + require.Regexp(t, errPat, err.Error()) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) return } @@ -449,8 +456,98 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h checkRegionRanges(c, regions, result) } +<<<<<<< HEAD func (s *localSuite) TestBatchSplitRegionByRanges(c *C) { s.doTestBatchSplitRegionByRanges(context.Background(), c, nil, "", nil) +======= +func TestBatchSplitRegionByRanges(t *testing.T) { + doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil) +} + +type checkScatterClient struct { + *testSplitClient + + mu sync.Mutex + notFoundFirstTime map[uint64]struct{} + scatterCounter atomic.Int32 +} + +func newCheckScatterClient(inner *testSplitClient) *checkScatterClient { + return &checkScatterClient{ + testSplitClient: inner, + notFoundFirstTime: map[uint64]struct{}{}, + scatterCounter: atomic.Int32{}, + } +} + +func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { + c.scatterCounter.Add(1) + return nil +} + +func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.notFoundFirstTime[regionID]; !ok { + c.notFoundFirstTime[regionID] = struct{}{} + return nil, nil + } + return c.testSplitClient.GetRegionByID(ctx, regionID) +} + +func TestMissingScatter(t *testing.T) { + ctx := context.Background() + splitHook := defaultHook{} + deferFunc := splitHook.setup(t) + defer deferFunc() + + keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + client := initTestSplitClient(keys, nil) + checkClient := newCheckScatterClient(client) + local := &Backend{ + splitCli: checkClient, + logger: log.L(), + } + local.RegionSplitBatchSize = 4 + local.RegionSplitConcurrency = 4 + + // current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) + rangeStart := codec.EncodeBytes([]byte{}, []byte("b")) + rangeEnd := codec.EncodeBytes([]byte{}, []byte("c")) + regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + // regions is: [aay, bba), [bba, bbh), [bbh, cca) + checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")}) + + // generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz) + ranges := make([]Range, 0) + start := []byte{'b'} + for i := byte('a'); i <= 'z'; i++ { + end := []byte{'b', i} + ranges = append(ranges, Range{start: start, end: end}) + start = end + } + + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + require.NoError(t, err) + + splitHook.check(t, client) + + // check split ranges + regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + result := [][]byte{ + []byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"), + []byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"), + []byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"), + []byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"), + []byte("by"), []byte("bz"), []byte("cca"), + } + checkRegionRanges(t, regions, result) + + // the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca) + require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load())) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) } type batchSizeHook struct{} @@ -587,8 +684,13 @@ func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) { }) } +<<<<<<< HEAD err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) c.Check(err, IsNil) +======= + err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4) + require.NoError(t, err) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) rangeEnd := codec.EncodeBytes([]byte{}, []byte("b")) @@ -683,8 +785,13 @@ func (s *localSuite) doTestBatchSplitByRangesWithClusteredIndex(c *C, hook clien start = e } +<<<<<<< HEAD err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) c.Assert(err, IsNil) +======= + err := local.SplitAndScatterRegionByRanges(ctx, ranges, true) + require.NoError(t, err) +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) startKey := codec.EncodeBytes([]byte{}, rangeKeys[0]) endKey := codec.EncodeBytes([]byte{}, rangeKeys[len(rangeKeys)-1])