Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter(cdc): add config cache-size #9024

Merged
merged 4 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/engine/factory/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func createPebbleDBs(
dbs := make([]*pebble.DB, 0, cfg.Count)
writeStalls := make([]writeStall, cfg.Count)

cache := pebble.NewCache(int64(memQuotaInBytes))
defer cache.Unref()
for id := 0; id < cfg.Count; id++ {
ws := writeStalls[id]
adjust := func(opts *pebble.Options) {
Expand All @@ -57,7 +59,7 @@ func createPebbleDBs(
}
}

db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust)
db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust)
if err != nil {
log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err))
for _, db := range dbs {
Expand All @@ -67,7 +69,7 @@ func createPebbleDBs(
}
log.Info("create pebble instance success",
zap.Int("id", id+1),
zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count)))
zap.Uint64("sharedCacheSize", memQuotaInBytes))
dbs = append(dbs, db)
}
return dbs, writeStalls, nil
Expand Down
8 changes: 2 additions & 6 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func iterTable(
// OpenPebble opens a pebble.
func OpenPebble(
id int, path string, cfg *config.DBConfig,
cacheSize uint64,
cache *pebble.Cache,
adjusts ...func(*pebble.Options),
) (db *pebble.DB, err error) {
dbDir := filepath.Join(path, fmt.Sprintf("%04d", id))
Expand All @@ -99,11 +99,7 @@ func OpenPebble(
}

opts := buildPebbleOption(cfg)
if cacheSize > 0 {
opts.Cache = pebble.NewCache(int64(cacheSize))
defer opts.Cache.Unref()
}

opts.Cache = cache
for _, adjust := range adjusts {
adjust(opts)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sourcemanager/engine/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(
1, dbPath, &config.DBConfig{Count: 1},
1024*1024*10,
nil,
// Disable auto compactions to make the case more stable.
func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true },
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func TestTableOperations(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand All @@ -53,7 +53,7 @@ func TestTableOperations(t *testing.T) {
// TestNoResolvedTs tests resolved timestamps shouldn't be emitted.
func TestNoResolvedTs(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -84,7 +84,7 @@ func TestNoResolvedTs(t *testing.T) {
// TestEventFetch tests events can be sorted correctly.
func TestEventFetch(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestEventFetch(t *testing.T) {

func TestCleanData(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), t.Name())
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10)
db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil)
require.Nil(t, err)
defer func() { _ = db.Close() }()

Expand Down
15 changes: 3 additions & 12 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func (s *server) prepare(ctx context.Context) error {
return errors.Trace(err)
}

if err := s.createSortEngineFactory(); err != nil {
return errors.Trace(err)
}
s.createSortEngineFactory()

if err := s.setMemoryLimit(); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -213,7 +211,7 @@ func (s *server) setMemoryLimit() error {
return nil
}

func (s *server) createSortEngineFactory() error {
func (s *server) createSortEngineFactory() {
conf := config.GetGlobalServerConfig()
if s.sortEngineFactory != nil {
if err := s.sortEngineFactory.Close(); err != nil {
Expand All @@ -225,19 +223,12 @@ func (s *server) createSortEngineFactory() error {
// Sorter dir has been set and checked when server starts.
// See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275
sortDir := config.GetGlobalServerConfig().Sorter.SortDir
totalMemory, err := util.GetMemoryLimit()
if err != nil {
return errors.Trace(err)
}
memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100
memInBytes := uint64(float64(totalMemory) * memPercentage)
memInBytes := conf.Sorter.CacheSizeInMB * uint64(1<<20)
s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB)
log.Info("sorter engine memory limit",
zap.Uint64("bytes", memInBytes),
zap.String("memory", humanize.IBytes(memInBytes)),
)

return nil
}

// Run runs the server.
Expand Down
26 changes: 0 additions & 26 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,7 @@ func (o *options) addFlags(cmd *cobra.Command) {
cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval")
_ = cmd.Flags().MarkHidden("processor-flush-interval")

// sorter related parameters, hidden them since cannot be configured by TiUP easily.
cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size")
_ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine")

cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level")
_ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker")

cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting")
_ = cmd.Flags().MarkHidden("sorter-chunk-size-limit")

// 80 is safe on most systems.
cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-percentage")
// We use 8GB as a safe default before we support local configuration file.
cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort")
_ = cmd.Flags().MarkHidden("sorter-max-memory-consumption")

// sort-dir id deprecate, hidden it.
cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory")
_ = cmd.Flags().MarkHidden("sort-dir")
Expand Down Expand Up @@ -205,16 +189,6 @@ func (o *options) complete(cmd *cobra.Command) error {
cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval
case "processor-flush-interval":
cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval
case "sorter-num-workerpool-goroutine":
cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine
case "sorter-num-concurrent-worker":
cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker
case "sorter-chunk-size-limit":
cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit
case "sorter-max-memory-percentage":
cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage
case "sorter-max-memory-consumption":
cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption
case "ca":
cfg.Security.CAPath = o.serverConfig.Security.CAPath
case "cert":
Expand Down
17 changes: 10 additions & 7 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestParseCfg(t *testing.T) {
"--cert", "bb",
"--key", "cc",
"--cert-allowed-cn", "dd,ee",
"--sorter-max-memory-percentage", "70",
"--sort-dir", "/tmp/just_a_test",
}))

Expand Down Expand Up @@ -152,8 +151,9 @@ func TestParseCfg(t *testing.T) {
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 70,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 128,
MaxMemoryPercentage: 10,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down Expand Up @@ -232,8 +232,9 @@ max-days = 1
max-backups = 1

[sorter]
max-memory-percentage = 3
sort-dir = "/tmp/just_a_test"
cache-size-in-mb = 8
max-memory-percentage = 3

[kv-client]
region-retry-duration = "3s"
Expand Down Expand Up @@ -302,8 +303,9 @@ check-balance-interval = "10s"
OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 3,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 8,
MaxMemoryPercentage: 3,
},
Security: &config.SecurityConfig{},
KVClient: &config.KVClientConfig{
Expand Down Expand Up @@ -377,8 +379,9 @@ max-days = 1
max-backups = 1

[sorter]
max-memory-percentage = 3
sort-dir = "/tmp/just_a_test"
cache-size-in-mb = 8
max-memory-percentage = 3

[security]
ca-path = "aa"
Expand All @@ -403,7 +406,6 @@ cert-allowed-cn = ["dd","ee"]
"--owner-flush-interval", "150ms",
"--processor-flush-interval", "150ms",
"--ca", "",
"--sorter-max-memory-percentage", "70",
"--config", configPath,
}))

Expand Down Expand Up @@ -431,8 +433,9 @@ cert-allowed-cn = ["dd","ee"]
OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond),
ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond),
Sorter: &config.SorterConfig{
MaxMemoryPercentage: 70,
SortDir: config.DefaultSortDir,
CacheSizeInMB: 8,
MaxMemoryPercentage: 3,
},
Security: &config.SecurityConfig{
CertPath: "bb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ const (
"owner-flush-interval": 50000000,
"processor-flush-interval": 50000000,
"sorter": {
"max-memory-percentage": 10,
"sort-dir": "/tmp/sorter",
"cache-size-in-mb": 128,
"max-memory-percentage": 10,
"max-memory-consumption": 0,
"num-workerpool-goroutine": 0,
"num-concurrent-worker": 0,
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ var defaultServerConfig = &ServerConfig{
OwnerFlushInterval: TomlDuration(50 * time.Millisecond),
ProcessorFlushInterval: TomlDuration(50 * time.Millisecond),
Sorter: &SorterConfig{
// Disable block-cache by default. TiCDC only scans events instead of
// accessing them randomly, so block-cache is unnecessary.
MaxMemoryPercentage: 10,
SortDir: DefaultSortDir,
CacheSizeInMB: 128, // By default use 128M memory as sorter cache.
MaxMemoryPercentage: 10, // Deprecated.
},
Security: &SecurityConfig{},
KVClient: &KVClientConfig{
Expand Down
21 changes: 14 additions & 7 deletions pkg/config/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@

package config

import "github.com/pingcap/tiflow/pkg/errors"
import (
"math"

"github.com/pingcap/tiflow/pkg/errors"
)

// SorterConfig represents sorter config for a changefeed
type SorterConfig struct {
// the maximum memory use percentage that allows in-memory sorting
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`
// the directory used to store the temporary files generated by the sorter
SortDir string `toml:"sort-dir" json:"sort-dir"`

// Cache size of sorter in MB.
CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"`

// the maximum memory use percentage that allows in-memory sorting
// Deprecated: use CacheSizeInMB instead.
MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"`

// the maximum memory consumption allowed for in-memory sorting
// Deprecated: we don't use this field anymore after introducing pull based sink.
MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"`
Expand All @@ -38,10 +47,8 @@ type SorterConfig struct {

// ValidateAndAdjust validates and adjusts the sorter configuration
func (c *SorterConfig) ValidateAndAdjust() error {
if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs(
"max-memory-percentage should be a percentage and within (0, 80]")
if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) {
return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)")
}

return nil
}
2 changes: 0 additions & 2 deletions tests/integration_tests/_utils/run_cdc_server
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand All @@ -137,7 +136,6 @@ else
GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \
--log-file $workdir/cdc$logsuffix.log \
--log-level $log_level \
--sorter-num-workerpool-goroutine 4 \
--data-dir "$data_dir" \
--cluster-id "$cluster_id" \
$config_path \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[consistent]
level = "eventual"
storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo"
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
threadcount=10
recordcount=100
operationcount=0
workload=core
fieldcount=1000
fieldlengthdistribution=constant
fieldlength=15000

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use `consistent_replicate_storage_file_large_value`;
set @@global.tidb_enable_exchange_partition=on;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (1),(2),(3),(4),(5),(6);
insert into t1 values (7),(8),(9);
insert into t1 values (11),(12),(20);
alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40));
insert into t1 values (25),(29),(35); /*these values in p3,p4*/

create table t2 (a int primary key);


Loading