Skip to content

Commit

Permalink
*: update dependencies tidb and pebbledb (pingcap#10899)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Apr 16, 2024
1 parent 72646f6 commit c8ed99f
Show file tree
Hide file tree
Showing 29 changed files with 180 additions and 205 deletions.
6 changes: 5 additions & 1 deletion cdc/processor/sourcemanager/sorter/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type SortEngineFactory struct {
// Following fields are valid if engineType is pebbleEngine.
pebbleConfig *config.DBConfig
dbs []*pebble.DB
cache *pebble.Cache
writeStalls []writeStall

// dbs is also readed in the background metrics collector.
Expand All @@ -80,7 +81,7 @@ func (f *SortEngineFactory) Create(ID model.ChangeFeedID) (e sorter.SortEngine,
return e, nil
}
if len(f.dbs) == 0 {
f.dbs, f.writeStalls, err = createPebbleDBs(f.dir, f.pebbleConfig, f.memQuotaInBytes)
f.dbs, f.cache, f.writeStalls, err = createPebbleDBs(f.dir, f.pebbleConfig, f.memQuotaInBytes)
if err != nil {
return
}
Expand Down Expand Up @@ -125,6 +126,9 @@ func (f *SortEngineFactory) Close() (err error) {
for _, db := range f.dbs {
err = multierr.Append(err, db.Close())
}
if f.cache != nil {
f.cache.Unref()
}
return
}

Expand Down
37 changes: 24 additions & 13 deletions cdc/processor/sourcemanager/sorter/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,30 @@ import (
func createPebbleDBs(
dir string, cfg *config.DBConfig,
memQuotaInBytes uint64,
) ([]*pebble.DB, []writeStall, error) {
dbs := make([]*pebble.DB, 0, cfg.Count)
writeStalls := make([]writeStall, cfg.Count)
) (dbs []*pebble.DB, cache *pebble.Cache, writeStalls []writeStall, err error) {
dbs = make([]*pebble.DB, 0, cfg.Count)
writeStalls = make([]writeStall, cfg.Count)
defer func() {
if err != nil {
for _, db := range dbs {
db.Close()
}
dbs = nil
if cache != nil {
cache.Unref()
cache = nil
}
writeStalls = nil
}
}()

cache := pebble.NewCache(int64(memQuotaInBytes))
defer cache.Unref()
cache = pebble.NewCache(int64(memQuotaInBytes))
for id := 0; id < cfg.Count; id++ {
ws := writeStalls[id]
adjust := func(opts *pebble.Options) {
opts.EventListener = pebble.MakeLoggingEventListener(&pebbleLogger{id: id})
listener := new(pebble.EventListener)
*listener = pebble.MakeLoggingEventListener(&pebbleLogger{id: id})
opts.EventListener = listener

opts.EventListener.WriteStallBegin = func(_ pebble.WriteStallBeginInfo) {
atomic.AddUint64(&ws.counter, 1)
Expand All @@ -59,20 +73,17 @@ func createPebbleDBs(
}
}

db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust)
if err != nil {
var db *pebble.DB
if db, err = epebble.OpenPebble(id, dir, cfg, cache, adjust); err != nil {
log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err))
for _, db := range dbs {
db.Close()
}
return nil, nil, err
return
}
log.Info("create pebble instance success",
zap.Int("id", id+1),
zap.Uint64("sharedCacheSize", memQuotaInBytes))
dbs = append(dbs, db)
}
return dbs, writeStalls, nil
return
}

type pebbleLogger struct{ id int }
Expand Down
10 changes: 7 additions & 3 deletions cdc/processor/sourcemanager/sorter/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func iterTable(
start := encoding.EncodeTsKey(uniqueID, uint64(tableID), lowerBound.CommitTs, lowerBound.StartTs)
end := encoding.EncodeTsKey(uniqueID, uint64(tableID), upperBoundNext.CommitTs, upperBoundNext.StartTs)

iter := db.NewIter(&pebble.IterOptions{
iter, err := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
TableFilter: func(userProps map[string]string) bool {
Expand All @@ -82,6 +82,10 @@ func iterTable(
return uint64(tableMaxCRTs) >= lowerBound.CommitTs && uint64(tableMinCRTs) <= upperBound.CommitTs
},
})
if err != nil {
log.Panic("fail to create iterator")
return nil
}
iter.First()
return iter
}
Expand Down Expand Up @@ -113,11 +117,11 @@ func buildPebbleOption(cfg *config.DBConfig) (opts *pebble.Options) {
opts.ErrorIfExists = true
opts.DisableWAL = false // Delete range requires WAL.
opts.MaxOpenFiles = cfg.MaxOpenFiles / cfg.Count
opts.MaxConcurrentCompactions = 6
opts.MaxConcurrentCompactions = func() int { return 6 }
opts.L0CompactionThreshold = cfg.CompactionL0Trigger
opts.L0StopWritesThreshold = cfg.WriteL0PauseTrigger
opts.LBaseMaxBytes = 64 << 20 // 64 MB
opts.MemTableSize = cfg.WriterBufferSize
opts.MemTableSize = uint64(cfg.WriterBufferSize)
opts.MemTableStopWritesThreshold = 4
opts.Levels = make([]pebble.LevelOptions, 7)
opts.TablePropertyCollectors = append(opts.TablePropertyCollectors,
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/sourcemanager/sorter/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ func (s *EventIter) Next() (event *model.PolymorphicEvent, txnFinished sorter.Po
// Thus, we need to fetch the next event and compare the commitTs and startTs with it
for valid {
nextStart := time.Now()
value, valid = s.iter.Value(), s.iter.Next()
value = s.iter.Value()
s.nextDuration.Observe(time.Since(nextStart).Seconds())

nextEvent = &model.PolymorphicEvent{}
if _, err = s.serde.Unmarshal(nextEvent, value); err != nil {
return
}
valid = s.iter.Next()

if s.currentEvent != nil {
break
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tiflow/dm/ctl/common"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand All @@ -32,6 +33,9 @@ import (
)

func main() {
// NOTE: the line is removed from TiDB repo in https://github.com/pingcap/tidb/pull/52191#issuecomment-2024836481.
collate.SetNewCollationEnabledForTest(false)

cfg := worker.NewConfig()
err := cfg.Parse(os.Args[1:])
switch errors.Cause(err) {
Expand Down
12 changes: 6 additions & 6 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"time"

_ "github.com/go-sql-driver/mysql" // for mysql
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/importer"
"github.com/pingcap/tidb/br/pkg/lightning/importer/opts"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/precheck"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/lightning/pkg/importer"
"github.com/pingcap/tidb/lightning/pkg/importer/opts"
"github.com/pingcap/tidb/lightning/pkg/precheck"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbutil"
Expand Down
2 changes: 1 addition & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/dustin/go-humanize"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/util/filter"
router "github.com/pingcap/tidb/pkg/util/table-router"
Expand Down
2 changes: 1 addition & 1 deletion dm/docs/RFCS/20220308_support_dump_files_store_in_s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ When DM synchronizes data at the full stage, it requires a dynamic space which i

### User config change

We still use loader's `dir` to config S3 url, but it should be noted that loader's `import-mode` should not be `loader` and it can be `sql` now, because we use [Lightning](https://github.com/pingcap/tidb/tree/master/br/pkg/lightning) to load files in S3.
We still use loader's `dir` to config S3 url, but it should be noted that loader's `import-mode` should not be `loader` and it can be `sql` now, because we use [Lightning](https://github.com/pingcap/tidb/tree/master/pkg/lightning) to load files in S3.
```
loaders:
global:
Expand Down
40 changes: 19 additions & 21 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/dumpling/export"
lserver "github.com/pingcap/tidb/lightning/pkg/server"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/errormanager"
"github.com/pingcap/tidb/pkg/parser/mysql"
tidbpromutil "github.com/pingcap/tidb/pkg/util/promutil"
"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -68,7 +68,7 @@ type LightningLoader struct {

logger log.Logger
cli *clientv3.Client
core *lightning.Lightning
core *lserver.Lightning
cancel context.CancelFunc // for per task context, which maybe different from lightning context

toDB *conn.BaseDB
Expand Down Expand Up @@ -96,7 +96,7 @@ func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName st
cli: cli,
workerName: workerName,
lightningGlobalConfig: lightningCfg,
core: lightning.New(lightningCfg),
core: lserver.New(lightningCfg),
logger: logger.WithFields(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
speedRecorder: export.NewSpeedRecorder(),
}
Expand Down Expand Up @@ -234,50 +234,50 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) (e
return err
}

var opts []lightning.Option
var opts []lserver.Option
if l.cfg.MetricsFactory != nil {
// this branch means dataflow engine has set a Factory, the Factory itself
// will register and deregister metrics, but lightning will expect the
// register and deregister at the beginning and end of its lifetime.
// So we use dataflow engine's Factory to register, and use dataflow engine's
// global metrics to manually deregister.
opts = append(opts,
lightning.WithPromFactory(
lserver.WithPromFactory(
promutil.NewWrappingFactory(
l.cfg.MetricsFactory,
"",
prometheus.Labels{"task": l.cfg.Name, "source_id": l.cfg.SourceID},
)),
lightning.WithPromRegistry(promutil.GetGlobalMetricRegistry()))
lserver.WithPromRegistry(promutil.GetGlobalMetricRegistry()))
} else {
registry := prometheus.DefaultGatherer.(prometheus.Registerer)
failpoint.Inject("DontUnregister", func() {
registry = promutil.NewOnlyRegRegister(registry)
})

opts = append(opts,
lightning.WithPromFactory(
lserver.WithPromFactory(
promutil.NewWrappingFactory(
tidbpromutil.NewDefaultFactory(),
"",
prometheus.Labels{"task": l.cfg.Name, "source_id": l.cfg.SourceID},
),
),
lightning.WithPromRegistry(registry))
lserver.WithPromRegistry(registry))
}
if l.cfg.ExtStorage != nil {
opts = append(opts,
lightning.WithDumpFileStorage(l.cfg.ExtStorage))
lserver.WithDumpFileStorage(l.cfg.ExtStorage))
}
if l.cfg.FrameworkLogger != nil {
opts = append(opts, lightning.WithLogger(l.cfg.FrameworkLogger))
opts = append(opts, lserver.WithLogger(l.cfg.FrameworkLogger))
} else {
opts = append(opts, lightning.WithLogger(l.logger.Logger))
opts = append(opts, lserver.WithLogger(l.logger.Logger))
}

var hasDup atomic.Bool
if l.cfg.LoaderConfig.ImportMode == config.LoadModePhysical {
opts = append(opts, lightning.WithDupIndicator(&hasDup))
opts = append(opts, lserver.WithDupIndicator(&hasDup))
}

err = l.core.RunOnceWithOptions(taskCtx, cfg, opts...)
Expand Down Expand Up @@ -366,10 +366,8 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
}
if cfg.TikvImporter.Backend == lcfg.BackendLocal {
cfg.TikvImporter.IncrementalImport = true
} else {
if err := cfg.TikvImporter.OnDuplicate.FromStringValue(string(subtaskCfg.OnDuplicateLogical)); err != nil {
return nil, err
}
} else if err := cfg.TikvImporter.OnDuplicate.FromStringValue(string(subtaskCfg.OnDuplicateLogical)); err != nil {
return nil, err
}
switch subtaskCfg.OnDuplicatePhysical {
case config.OnDuplicateManual:
Expand Down Expand Up @@ -605,7 +603,7 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult)
l.logger.Warn("try to resume, but already closed")
return
}
l.core = lightning.New(l.lightningGlobalConfig)
l.core = lserver.New(l.lightningGlobalConfig)
// continue the processing
l.Process(ctx, pr)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/loader/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/prometheus/client_golang/prometheus"
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/checker/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"fmt"

"github.com/docker/go-units"
"github.com/pingcap/tidb/br/pkg/lightning/importer"
"github.com/pingcap/tidb/br/pkg/lightning/precheck"
"github.com/pingcap/tidb/lightning/pkg/importer"
"github.com/pingcap/tidb/lightning/pkg/precheck"
"github.com/pingcap/tiflow/dm/pkg/log"
)

Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/checker/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/precheck"
"github.com/pingcap/tidb/lightning/pkg/precheck"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)
Expand Down
3 changes: 2 additions & 1 deletion dm/pkg/checker/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/schemacmp"
"github.com/pingcap/tiflow/dm/pkg/conn"
Expand Down Expand Up @@ -890,7 +891,7 @@ func (c *OptimisticShardingTablesChecker) checkTable(ctx context.Context, r *Res
c.reMu.Unlock()
}

ti, err := dbutil.GetTableInfoBySQL(statement, p)
ti, err := dbutiltest.GetTableInfoBySQL(statement, p)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/errors"
pclog "github.com/pingcap/log"
lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"
lightningLog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tiflow/dm/pkg/helper"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/pingcap/errors"
pclog "github.com/pingcap/log"
lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"
lightningLog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
Expand Down
4 changes: 4 additions & 0 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type executorContext struct {

var _ sqlexec.RestrictedSQLExecutor = executorContext{}

func (se executorContext) GetRestrictedSQLExecutor() sqlexec.RestrictedSQLExecutor {
return se
}

func (se executorContext) ParseWithParams(context.Context, string, ...interface{}) (ast.StmtNode, error) {
return nil, nil
}
Expand Down
Loading

0 comments on commit c8ed99f

Please sign in to comment.