Skip to content

Commit

Permalink
config(ticdc): enable pull based sink (#8233)
Browse files Browse the repository at this point in the history
close #8232
  • Loading branch information
Rustin170506 authored Feb 14, 2023
1 parent 9dc7438 commit bdd1226
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
21 changes: 13 additions & 8 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ func TestParseCfg(t *testing.T) {
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: true,
EnableNewScheduler: true,
EnableDBSorter: true,
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down Expand Up @@ -255,6 +256,7 @@ region-retry-duration = "3s"
[debug]
enable-db-sorter = true
enable-pull-based-sink = true
[debug.db]
count = 5
concurrency = 6
Expand Down Expand Up @@ -336,8 +338,9 @@ check-balance-interval = "10s"
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: true,
EnableNewScheduler: true,
EnableDBSorter: true,
EnablePullBasedSink: true,
EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 5,
Concurrency: 6,
Expand Down Expand Up @@ -483,8 +486,9 @@ cert-allowed-cn = ["dd","ee"]
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: true,
EnableNewScheduler: true,
EnableDBSorter: true,
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down Expand Up @@ -548,8 +552,9 @@ unknown3 = 3
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: true,
EnableNewScheduler: true,
EnableDBSorter: true,
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ const (
"table-actor": {
"event-batch-size": 32
},
"enable-pull-based-sink": false,
"enable-pull-based-sink": true,
"enable-db-sorter": true,
"db": {
"count": 8,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
type DebugConfig struct {
TableActor *TableActorConfig `toml:"table-actor" json:"table-actor"`

// EnablePullBasedSink enables pull-based sink, false by default.
// EnablePullBasedSink enables pull-based sink, true by default.
//
// NOTE: currently it can only be enabled with EnableDBSorter, because unified
// sorter hasn't been transformed into the new interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ var defaultServerConfig = &ServerConfig{

Scheduler: NewDefaultSchedulerConfig(),
EnableNewSink: true,
EnablePullBasedSink: false,
EnablePullBasedSink: true,
},
ClusterID: "default",
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function run() {
run_sql "CREATE table processor_err_chan.t$i (id int primary key auto_increment)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
done

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/pipeline/ProcessorAddTableError=1*return(true)'
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller/ProcessorAddTableError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr

changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
Expand Down

0 comments on commit bdd1226

Please sign in to comment.