From 50d404df4794400a6e1010779793517696721158 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 23 May 2023 19:23:38 +0800 Subject: [PATCH] sorter(cdc): add config cache-size (#9024) ref pingcap/tiflow#8974 --- .../sourcemanager/engine/factory/pebble.go | 6 +- .../sourcemanager/engine/pebble/db.go | 8 +- .../sourcemanager/engine/pebble/db_test.go | 2 +- .../engine/pebble/event_sorter_test.go | 8 +- cdc/server/server.go | 15 +--- pkg/cmd/server/server.go | 26 ------ pkg/cmd/server/server_test.go | 17 ++-- pkg/config/config_test_data.go | 3 +- pkg/config/server_config.go | 5 +- pkg/config/sorter.go | 21 +++-- tests/integration_tests/_utils/run_cdc_server | 2 - .../conf/changefeed.toml | 3 + .../conf/diff_config.toml | 29 +++++++ .../conf/workload | 16 ++++ .../data/prepare.sql | 13 +++ .../run.sh | 84 +++++++++++++++++++ tests/integration_tests/run_group.sh | 2 +- 17 files changed, 188 insertions(+), 72 deletions(-) create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql create mode 100644 tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh diff --git a/cdc/processor/sourcemanager/engine/factory/pebble.go b/cdc/processor/sourcemanager/engine/factory/pebble.go index d3a71a35305..31c85fbb6a3 100644 --- a/cdc/processor/sourcemanager/engine/factory/pebble.go +++ b/cdc/processor/sourcemanager/engine/factory/pebble.go @@ -34,6 +34,8 @@ func createPebbleDBs( dbs := make([]*pebble.DB, 0, cfg.Count) writeStalls := make([]writeStall, cfg.Count) + cache := pebble.NewCache(int64(memQuotaInBytes)) + defer cache.Unref() for id := 0; id < cfg.Count; id++ { ws := writeStalls[id] adjust := func(opts *pebble.Options) { @@ -57,7 +59,7 @@ func createPebbleDBs( } } - db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust) + db, err := epebble.OpenPebble(id, dir, cfg, cache, adjust) if err != nil { log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err)) for _, db := range dbs { @@ -67,7 +69,7 @@ func createPebbleDBs( } log.Info("create pebble instance success", zap.Int("id", id+1), - zap.Uint64("cacheSize", memQuotaInBytes/uint64(cfg.Count))) + zap.Uint64("sharedCacheSize", memQuotaInBytes)) dbs = append(dbs, db) } return dbs, writeStalls, nil diff --git a/cdc/processor/sourcemanager/engine/pebble/db.go b/cdc/processor/sourcemanager/engine/pebble/db.go index b299ecbd8e0..695fc210df6 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db.go +++ b/cdc/processor/sourcemanager/engine/pebble/db.go @@ -89,7 +89,7 @@ func iterTable( // OpenPebble opens a pebble. func OpenPebble( id int, path string, cfg *config.DBConfig, - cacheSize uint64, + cache *pebble.Cache, adjusts ...func(*pebble.Options), ) (db *pebble.DB, err error) { dbDir := filepath.Join(path, fmt.Sprintf("%04d", id)) @@ -99,11 +99,7 @@ func OpenPebble( } opts := buildPebbleOption(cfg) - if cacheSize > 0 { - opts.Cache = pebble.NewCache(int64(cacheSize)) - defer opts.Cache.Unref() - } - + opts.Cache = cache for _, adjust := range adjusts { adjust(opts) } diff --git a/cdc/processor/sourcemanager/engine/pebble/db_test.go b/cdc/processor/sourcemanager/engine/pebble/db_test.go index 16fbcc938a1..af65f2d331d 100644 --- a/cdc/processor/sourcemanager/engine/pebble/db_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/db_test.go @@ -30,7 +30,7 @@ func TestIteratorWithTableFilter(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) db, err := OpenPebble( 1, dbPath, &config.DBConfig{Count: 1}, - 1024*1024*10, + nil, // Disable auto compactions to make the case more stable. func(opts *pebble.Options) { opts.DisableAutomaticCompactions = true }, ) diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index 6f3b7f14155..dd316f1f4d0 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -30,7 +30,7 @@ import ( func TestTableOperations(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -53,7 +53,7 @@ func TestTableOperations(t *testing.T) { // TestNoResolvedTs tests resolved timestamps shouldn't be emitted. func TestNoResolvedTs(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -84,7 +84,7 @@ func TestNoResolvedTs(t *testing.T) { // TestEventFetch tests events can be sorted correctly. func TestEventFetch(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() @@ -166,7 +166,7 @@ func TestEventFetch(t *testing.T) { func TestCleanData(t *testing.T) { dbPath := filepath.Join(t.TempDir(), t.Name()) - db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, 1024*1024*10) + db, err := OpenPebble(1, dbPath, &config.DBConfig{Count: 1}, nil) require.Nil(t, err) defer func() { _ = db.Close() }() diff --git a/cdc/server/server.go b/cdc/server/server.go index 39673d28719..8cce44c2ba9 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -181,9 +181,7 @@ func (s *server) prepare(ctx context.Context) error { return errors.Trace(err) } - if err := s.createSortEngineFactory(); err != nil { - return errors.Trace(err) - } + s.createSortEngineFactory() if err := s.setMemoryLimit(); err != nil { return errors.Trace(err) @@ -213,7 +211,7 @@ func (s *server) setMemoryLimit() error { return nil } -func (s *server) createSortEngineFactory() error { +func (s *server) createSortEngineFactory() { conf := config.GetGlobalServerConfig() if s.sortEngineFactory != nil { if err := s.sortEngineFactory.Close(); err != nil { @@ -225,19 +223,12 @@ func (s *server) createSortEngineFactory() error { // Sorter dir has been set and checked when server starts. // See https://github.com/pingcap/tiflow/blob/9dad09/cdc/server.go#L275 sortDir := config.GetGlobalServerConfig().Sorter.SortDir - totalMemory, err := util.GetMemoryLimit() - if err != nil { - return errors.Trace(err) - } - memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100 - memInBytes := uint64(float64(totalMemory) * memPercentage) + memInBytes := conf.Sorter.CacheSizeInMB * uint64(1<<20) s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB) log.Info("sorter engine memory limit", zap.Uint64("bytes", memInBytes), zap.String("memory", humanize.IBytes(memInBytes)), ) - - return nil } // Run runs the server. diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 64d03691ffe..4d5749e888a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -79,23 +79,7 @@ func (o *options) addFlags(cmd *cobra.Command) { cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval") _ = cmd.Flags().MarkHidden("processor-flush-interval") - // sorter related parameters, hidden them since cannot be configured by TiUP easily. - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size") - _ = cmd.Flags().MarkHidden("sorter-num-workerpool-goroutine") - - cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") - _ = cmd.Flags().MarkHidden("sorter-num-concurrent-worker") - - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") - _ = cmd.Flags().MarkHidden("sorter-chunk-size-limit") - // 80 is safe on most systems. - cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPercentage, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPercentage, "system memory usage threshold for forcing in-disk sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-percentage") - // We use 8GB as a safe default before we support local configuration file. - cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") - _ = cmd.Flags().MarkHidden("sorter-max-memory-consumption") - // sort-dir id deprecate, hidden it. cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory") _ = cmd.Flags().MarkHidden("sort-dir") @@ -205,16 +189,6 @@ func (o *options) complete(cmd *cobra.Command) error { cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval case "processor-flush-interval": cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval - case "sorter-num-workerpool-goroutine": - cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine - case "sorter-num-concurrent-worker": - cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker - case "sorter-chunk-size-limit": - cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit - case "sorter-max-memory-percentage": - cfg.Sorter.MaxMemoryPercentage = o.serverConfig.Sorter.MaxMemoryPercentage - case "sorter-max-memory-consumption": - cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption case "ca": cfg.Security.CAPath = o.serverConfig.Security.CAPath case "cert": diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 2029faa34ec..44210c1af91 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -124,7 +124,6 @@ func TestParseCfg(t *testing.T) { "--cert", "bb", "--key", "cc", "--cert-allowed-cn", "dd,ee", - "--sorter-max-memory-percentage", "70", "--sort-dir", "/tmp/just_a_test", })) @@ -152,8 +151,9 @@ func TestParseCfg(t *testing.T) { OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ - MaxMemoryPercentage: 70, SortDir: config.DefaultSortDir, + CacheSizeInMB: 128, + MaxMemoryPercentage: 10, }, Security: &config.SecurityConfig{ CertPath: "bb", @@ -232,8 +232,9 @@ max-days = 1 max-backups = 1 [sorter] -max-memory-percentage = 3 sort-dir = "/tmp/just_a_test" +cache-size-in-mb = 8 +max-memory-percentage = 3 [kv-client] region-retry-duration = "3s" @@ -302,8 +303,9 @@ check-balance-interval = "10s" OwnerFlushInterval: config.TomlDuration(600 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(600 * time.Millisecond), Sorter: &config.SorterConfig{ - MaxMemoryPercentage: 3, SortDir: config.DefaultSortDir, + CacheSizeInMB: 8, + MaxMemoryPercentage: 3, }, Security: &config.SecurityConfig{}, KVClient: &config.KVClientConfig{ @@ -377,8 +379,9 @@ max-days = 1 max-backups = 1 [sorter] -max-memory-percentage = 3 sort-dir = "/tmp/just_a_test" +cache-size-in-mb = 8 +max-memory-percentage = 3 [security] ca-path = "aa" @@ -403,7 +406,6 @@ cert-allowed-cn = ["dd","ee"] "--owner-flush-interval", "150ms", "--processor-flush-interval", "150ms", "--ca", "", - "--sorter-max-memory-percentage", "70", "--config", configPath, })) @@ -431,8 +433,9 @@ cert-allowed-cn = ["dd","ee"] OwnerFlushInterval: config.TomlDuration(150 * time.Millisecond), ProcessorFlushInterval: config.TomlDuration(150 * time.Millisecond), Sorter: &config.SorterConfig{ - MaxMemoryPercentage: 70, SortDir: config.DefaultSortDir, + CacheSizeInMB: 8, + MaxMemoryPercentage: 3, }, Security: &config.SecurityConfig{ CertPath: "bb", diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index d12ad516186..14c94f2884d 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -92,8 +92,9 @@ const ( "owner-flush-interval": 50000000, "processor-flush-interval": 50000000, "sorter": { - "max-memory-percentage": 10, "sort-dir": "/tmp/sorter", + "cache-size-in-mb": 128, + "max-memory-percentage": 10, "max-memory-consumption": 0, "num-workerpool-goroutine": 0, "num-concurrent-worker": 0, diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 6c08c0da1bd..7e00b4bb209 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -106,10 +106,9 @@ var defaultServerConfig = &ServerConfig{ OwnerFlushInterval: TomlDuration(50 * time.Millisecond), ProcessorFlushInterval: TomlDuration(50 * time.Millisecond), Sorter: &SorterConfig{ - // Disable block-cache by default. TiCDC only scans events instead of - // accessing them randomly, so block-cache is unnecessary. - MaxMemoryPercentage: 10, SortDir: DefaultSortDir, + CacheSizeInMB: 128, // By default use 128M memory as sorter cache. + MaxMemoryPercentage: 10, // Deprecated. }, Security: &SecurityConfig{}, KVClient: &KVClientConfig{ diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index 8ec1ede0dab..2310dc2fa07 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -13,15 +13,24 @@ package config -import "github.com/pingcap/tiflow/pkg/errors" +import ( + "math" + + "github.com/pingcap/tiflow/pkg/errors" +) // SorterConfig represents sorter config for a changefeed type SorterConfig struct { - // the maximum memory use percentage that allows in-memory sorting - MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` // the directory used to store the temporary files generated by the sorter SortDir string `toml:"sort-dir" json:"sort-dir"` + // Cache size of sorter in MB. + CacheSizeInMB uint64 `toml:"cache-size-in-mb" json:"cache-size-in-mb"` + + // the maximum memory use percentage that allows in-memory sorting + // Deprecated: use CacheSizeInMB instead. + MaxMemoryPercentage int `toml:"max-memory-percentage" json:"max-memory-percentage"` + // the maximum memory consumption allowed for in-memory sorting // Deprecated: we don't use this field anymore after introducing pull based sink. MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` @@ -38,10 +47,8 @@ type SorterConfig struct { // ValidateAndAdjust validates and adjusts the sorter configuration func (c *SorterConfig) ValidateAndAdjust() error { - if c.MaxMemoryPercentage < 0 || c.MaxMemoryPercentage > 80 { - return errors.ErrIllegalSorterParameter.GenWithStackByArgs( - "max-memory-percentage should be a percentage and within (0, 80]") + if c.CacheSizeInMB < 8 || c.CacheSizeInMB*uint64(1<<20) > uint64(math.MaxInt64) { + return errors.ErrIllegalSorterParameter.GenWithStackByArgs("cache-size-in-mb should be greater than 8(MB)") } - return nil } diff --git a/tests/integration_tests/_utils/run_cdc_server b/tests/integration_tests/_utils/run_cdc_server index d0ea0f3b82c..6b450f08175 100755 --- a/tests/integration_tests/_utils/run_cdc_server +++ b/tests/integration_tests/_utils/run_cdc_server @@ -120,7 +120,6 @@ if [[ "$restart" == "true" ]]; then GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ @@ -137,7 +136,6 @@ else GO_FAILPOINTS=$failpoint $binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server \ --log-file $workdir/cdc$logsuffix.log \ --log-level $log_level \ - --sorter-num-workerpool-goroutine 4 \ --data-dir "$data_dir" \ --cluster-id "$cluster_id" \ $config_path \ diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml new file mode 100644 index 00000000000..14e7fd80c09 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/changefeed.toml @@ -0,0 +1,3 @@ +[consistent] +level = "eventual" +storage = "file:///tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/redo" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml new file mode 100644 index 00000000000..1fab13aa043 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/consistent_replicate_storage_file_large_value/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["consistent_replicate_storage_file_large_value.usertable*","consistent_replicate_storage_file_large_value.t*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload new file mode 100644 index 00000000000..144a0e04a53 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/conf/workload @@ -0,0 +1,16 @@ +threadcount=10 +recordcount=100 +operationcount=0 +workload=core +fieldcount=1000 +fieldlengthdistribution=constant +fieldlength=15000 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql new file mode 100644 index 00000000000..93dc7cd7a1c --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/data/prepare.sql @@ -0,0 +1,13 @@ +use `consistent_replicate_storage_file_large_value`; +set @@global.tidb_enable_exchange_partition=on; + +create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +insert into t1 values (1),(2),(3),(4),(5),(6); +insert into t1 values (7),(8),(9); +insert into t1 values (11),(12),(20); +alter table t1 add partition (partition p3 values less than (30), partition p4 values less than (40)); +insert into t1 values (25),(29),(35); /*these values in p3,p4*/ + +create table t2 (a int primary key); + + diff --git a/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh new file mode 100644 index 00000000000..591db8bfb38 --- /dev/null +++ b/tests/integration_tests/consistent_replicate_storage_file_large_value/run.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +rm -rf "$WORK_DIR" +mkdir -p "$WORK_DIR" + +stop() { + # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_storage_file_large_value.usertable;") + stop_tidb_cluster +} + +function run() { + # we only support eventually consistent replication with MySQL sink + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + + run_sql "CREATE DATABASE consistent_replicate_storage_file_large_value;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_storage_file_large_value + run_sql "CREATE table consistent_replicate_storage_file_large_value.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "consistent_replicate_storage_file_large_value.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_storage_file_large_value.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + # Inject the failpoint to prevent sink execution, but the global resolved can be moved forward. + # Then we can apply redo log to reach an eventual consistent state in downstream. + cleanup_process $CDC_BINARY + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t2 values (100),(101),(102),(103),(104),(105);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.t1 values (25),(29);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into consistent_replicate_storage_file_large_value.usertable2 select * from consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # to ensure row changed events have been replicated to TiCDC + sleep 20 + + storage_path="file://$WORK_DIR/redo" + tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id + current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta + cleanup_process $CDC_BINARY + + export GO_FAILPOINTS='' + + # This value is generated by: + # echo -n '123456' | base64 + # MTIzNDU2 + # Use this value here to test redo apply function works well + # when use base64 encoded password + ENPASSWORD="MTIzNDU2" + + cdc redo apply --tmp-dir="$tmp_download_path/apply" \ + --storage="$storage_path" \ + --sink-uri="mysql://normal:${ENPASSWORD}@127.0.0.1:3306/" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml +} + +trap stop EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index c1cc7f10b81..5bcda2d60f2 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -12,7 +12,7 @@ group=$2 # multi_cdc_cluster capture_suicide_while_balance_table mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint" mysql_only_http="http_api http_api_tls api_v2" -mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_s3" +mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3" # Tests that need to support kafka: bank kill_owner_with_ddl owner_remove_table_error # owner_resign processor_etcd_worker_delay processor_resolved_ts_fallback