Skip to content

Commit

Permalink
sorter(cdc): add config cache-size (#9024)
Browse files Browse the repository at this point in the history
ref #8974
  • Loading branch information
hicqu committed May 24, 2023
1 parent 616dc38 commit 7f84fb4
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 72 deletions.
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/engine/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func createPebbleDBs(
dbs := make([]*pebble.DB, 0, cfg.Count)
writeStalls := make([]writeStall, cfg.Count)

cache := pebble.NewCache(int64(memQuotaInBytes))
defer cache.Unref()
for id := 0; id < cfg.Count; id++ {
ws := writeStalls[id]
adjust := func(opts *pebble.Options) {
Expand All @@ -57,7 +59,7 @@ func createPebbleDBs(
}
}

db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust)
db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust)
if err != nil {
log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err))
for _, db := range dbs {
Expand All @@ -67,7 +69,7 @@ func createPebbleDBs(
}
log.Info("create pebble instance success",
zap.Int("id", id+1),
zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count)))
zap.Uint64("sharedCacheSize", memQuotaInBytes))
dbs = append(dbs, db)
}
return dbs, writeStalls, nil
Expand Down
8 changes: 2 additions & 6 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func iterTable(
// OpenPebble opens a pebble.
func OpenPebble(
id int, path string, cfg *config.DBConfig,
cacheSize uint64,
cache *pebble.Cache,
adjusts ...func(*pebble.Options),
) (db *pebble.DB, err error) {
dbDir := filepath.Join(path, fmt.Sprintf("%04d", id))
Expand All @@ -99,11 +99,7 @@ func OpenPebble(
}

opts := buildPebbleOption(cfg)
if cacheSize > 0 {
opts.Cache = pebble.NewCache(int64(cacheSize))
defer opts.Cache.Unref()
}

opts.Cache = cache
for _, adjust := range adjusts {
adjust(opts)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(
1, dbPath, &config.DBConfig{Count: 1},
1024*1024*10,
nil,
// Disable auto compactions to make the case more stable.
func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true },
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func TestTableOperations(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand All @@ -53,7 +53,7 @@ func TestTableOperations(t *testing.T) {
// TestNoResolvedTs tests resolved timestamps shouldn't be emitted.
func TestNoResolvedTs(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -84,7 +84,7 @@ func TestNoResolvedTs(t *testing.T) {
// TestEventFetch tests events can be sorted correctly.
func TestEventFetch(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestEventFetch(t *testing.T) {

func TestCleanData(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down
15 changes: 3 additions & 12 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func (s *server) prepare(ctx context.Context) error {
return errors.Trace(err)
}

if err := s.createSortEngineFactory(); err != nil {
return errors.Trace(err)
}
s.createSortEngineFactory()

if err := s.setMemoryLimit(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -213,7 +211,7 @@ func (s *server) setMemoryLimit() error {
return nil
}

func (s *server) createSortEngineFactory() error {
func (s *server) createSortEngineFactory() {
conf := config.GetGlobalServerConfig()
if s.sortEngineFactory != nil {
if err := s.sortEngineFactory.Close(); err != nil {
Expand All @@ -225,19 +223,12 @@ func (s *server) createSortEngineFactory() error {
// Sorter dir has been set and checked when server starts.
// See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
totalMemory, err := util.GetMemoryLimit()
if err != nil {
return errors.Trace(err)
}
memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100
memInBytes := uint64(float64(totalMemory) * memPercentage)
memInBytes := conf.Sorter.CacheSizeInMB * uint64(1<<20)
s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB)
log.Info("sorter engine memory limit",
zap.Uint64("bytes", memInBytes),
zap.String("memory", humanize.IBytes(memInBytes)),
)

return nil
}

// Run runs the server.
Expand Down
26 changes: 0 additions & 26 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,7 @@ func (o *options) addFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval")
_ = cmd.Flags().MarkHidden("processor-flush-interval")

// sorter related parameters, hidden them since cannot be configured by TiUP easily.
cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size")
_ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine")

cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level")
_ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker")

cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting")
_ = cmd.Flags().MarkHidden("sorter-chunk-size-limit")

// 80 is safe on most systems.
cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-percentage")
// We use 8GB as a safe default before we support local configuration file.
cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-consumption")

// sort-dir id deprecate, hidden it.
cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory")
_ = cmd.Flags().MarkHidden("sort-dir")
Expand Down Expand Up @@ -205,16 +189,6 @@ func (o *options) complete(cmd *cobra.Command) error {
cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval
case "processor-flush-interval":
cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval
case "sorter-num-workerpool-goroutine":
cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine
case "sorter-num-concurrent-worker":
cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker
case "sorter-chunk-size-limit":
cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit
case "sorter-max-memory-percentage":
cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage
case "sorter-max-memory-consumption":
cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption
case "ca":
cfg.Security.CAPath = o.serverConfig.Security.CAPath
case "cert":
Expand Down
17 changes: 10 additions & 7 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestParseCfg(t *testing.T) {
"--cert", "bb",
"--key", "cc",
"--cert-allowed-cn", "dd,ee",
"--sorter-max-memory-percentage", "70",
"--sort-dir", "/tmp/just_a_test",
}))

Expand Down Expand Up @@ -152,8 +151,9 @@ func TestParseCfg(t *testing.T) {
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 70,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 128,
MaxMemoryPercentage: 10,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down Expand Up @@ -232,8 +232,9 @@ max-days = 1
max-backups = 1
[sorter]
max-memory-percentage = 3
sort-dir = "/tmp/just_a_test"
cache-size-in-mb = 8
max-memory-percentage = 3
[kv-client]
region-retry-duration = "3s"
Expand Down Expand Up @@ -302,8 +303,9 @@ check-balance-interval = "10s"
OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 3,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 8,
MaxMemoryPercentage: 3,
},
Security: &config.SecurityConfig{},
KVClient: &config.KVClientConfig{
Expand Down Expand Up @@ -377,8 +379,9 @@ max-days = 1
max-backups = 1
[sorter]
max-memory-percentage = 3
sort-dir = "/tmp/just_a_test"
cache-size-in-mb = 8
max-memory-percentage = 3
[security]
ca-path = "aa"
Expand All @@ -403,7 +406,6 @@ cert-allowed-cn = ["dd","ee"]
"--owner-flush-interval", "150ms",
"--processor-flush-interval", "150ms",
"--ca", "",
"--sorter-max-memory-percentage", "70",
"--config", configPath,
}))

Expand Down Expand Up @@ -431,8 +433,9 @@ cert-allowed-cn = ["dd","ee"]
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 70,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 8,
MaxMemoryPercentage: 3,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ const (
"owner-flush-interval": 50000000,
"processor-flush-interval": 50000000,
"sorter": {
"max-memory-percentage": 0,
"sort-dir": "/tmp/sorter",
"cache-size-in-mb": 128,
"max-memory-percentage": 10,
"max-memory-consumption": 0,
"num-workerpool-goroutine": 0,
"num-concurrent-worker": 0,
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ var defaultServerConfig = &ServerConfig{
OwnerFlushInterval: TomlDuration(50 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(50 * time.Millisecond),
Sorter: &SorterConfig{
// Disable block-cache by default. TiCDC only scans events instead of
// accessing them randomly, so block-cache is unnecessary.
MaxMemoryPercentage: 0,
SortDir: DefaultSortDir,
CacheSizeInMB: 128, // By default use 128M memory as sorter cache.
MaxMemoryPercentage: 10, // Deprecated.
},
Security: &SecurityConfig{},
KVClient: &KVClientConfig{
Expand Down
21 changes: 14 additions & 7 deletions pkg/config/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@

package config

import "github.com/pingcap/tiflow/pkg/errors"
import (
"math"

"github.com/pingcap/tiflow/pkg/errors"
)

// SorterConfig represents sorter config for a changefeed
type SorterConfig struct {
// the maximum memory use percentage that allows in-memory sorting
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`
// the directory used to store the temporary files generated by the sorter
SortDir string `toml:"sort-dir" json:"sort-dir"`

// Cache size of sorter in MB.
CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"`

// the maximum memory use percentage that allows in-memory sorting
// Deprecated: use CacheSizeInMB instead.
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`

// the maximum memory consumption allowed for in-memory sorting
// Deprecated: we don't use this field anymore after introducing pull based sink.
MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"`
Expand All @@ -38,10 +47,8 @@ type SorterConfig struct {

// ValidateAndAdjust validates and adjusts the sorter configuration
func (c *SorterConfig) ValidateAndAdjust() error {
if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs(
"max-memory-percentage should be a percentage and within (0, 80]")
if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)")
}

return nil
}
2 changes: 0 additions & 2 deletions tests/integration_tests/_utils/run_cdc_server
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand All @@ -137,7 +136,6 @@ else
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo"
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
threadcount=10
recordcount=100
operationcount=0
workload=core
fieldcount=1000
fieldlengthdistribution=constant
fieldlength=15000

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use `consistent_replicate_storage_file_large_value`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);


Loading

0 comments on commit 7f84fb4

Please sign in to comment.