From 46676247bbead2e90d444436d5e9f71000be0e84 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 17 May 2023 11:58:53 +0800 Subject: [PATCH 1/4] fix ddl for redo --- cdc/owner/changefeed.go | 28 +++-- cdc/owner/ddl_manager.go | 114 +++++++++++------- cdc/owner/ddl_manager_test.go | 10 +- cdc/redo/manager.go | 7 ++ cdc/redo/meta_manager.go | 7 ++ .../conf/diff_config.toml | 1 + .../consistent_replicate_ddl/run.sh | 67 +++++++--- 7 files changed, 157 insertions(+), 77 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ada6cd290df..20d2c947442 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -16,6 +16,7 @@ package owner import ( "context" "fmt" + "math" "strings" "sync" "time" @@ -305,7 +306,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } // TODO: pass table checkpointTs when we support concurrent process ddl - allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil) + allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil) if err != nil { return errors.Trace(err) } @@ -325,11 +326,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* barrier.GlobalBarrierTs = otherBarrierTs } - if minTableBarrierTs > otherBarrierTs { + if barrier.minTableBarrierTs > otherBarrierTs { log.Debug("There are other barriers less than min table barrier, wait for them", zap.Uint64("otherBarrierTs", otherBarrierTs), zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs)) - minTableBarrierTs = otherBarrierTs + barrier.minTableBarrierTs = otherBarrierTs } log.Debug("owner handles barrier", @@ -338,7 +339,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs), - zap.Uint64("minTableBarrierTs", minTableBarrierTs), + zap.Uint64("minTableBarrierTs", barrier.minTableBarrierTs), zap.Any("tableBarrier", barrier.TableBarriers)) if barrier.GlobalBarrierTs < checkpointTs { @@ -350,7 +351,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* startTime := time.Now() newCheckpointTs, newResolvedTs, err := c.scheduler.Tick( - ctx, checkpointTs, allPhysicalTables, captures, barrier) + ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier) // metricsResolvedTs to store the min resolved ts among all tables and show it in metrics metricsResolvedTs := newResolvedTs costTime := time.Since(startTime) @@ -379,19 +380,22 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // If the owner is just initialized, the newResolvedTs may be max uint64. // In this case, we should not update the resolved ts. - if newResolvedTs > barrier.GlobalBarrierTs { - newResolvedTs = barrier.GlobalBarrierTs + if newResolvedTs == math.MaxUint64 { + newResolvedTs = c.state.Status.ResolvedTs } // If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`. // In such case the `newCheckpointTs` may be larger than the minTableBarrierTs, // but it shouldn't be, so we need to handle it here. - if newCheckpointTs > minTableBarrierTs { - newCheckpointTs = minTableBarrierTs + if newCheckpointTs > barrier.minTableBarrierTs { + newCheckpointTs = barrier.minTableBarrierTs } prevResolvedTs := c.state.Status.ResolvedTs if c.redoMetaMgr.Enabled() { + if newResolvedTs > barrier.physicalTableBarrierTs { + newResolvedTs = barrier.physicalTableBarrierTs + } // newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled, // we can only upload it to etcd after it has been flushed into redo meta. // NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally. @@ -428,8 +432,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } // MinTableBarrierTs should never regress - if minTableBarrierTs < c.state.Status.MinTableBarrierTs { - minTableBarrierTs = c.state.Status.MinTableBarrierTs + if barrier.minTableBarrierTs < c.state.Status.MinTableBarrierTs { + barrier.minTableBarrierTs = c.state.Status.MinTableBarrierTs } failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() { @@ -443,7 +447,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } }) - c.updateStatus(newCheckpointTs, newResolvedTs, minTableBarrierTs) + c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minTableBarrierTs) c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs) c.tickDownstreamObserver(ctx) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 6a7daf04d1f..fc8da931e1d 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -34,6 +34,21 @@ import ( // of tableBarrier in a single barrier. const tableBarrierNumberLimit = 256 +// The ddls below is globalDDLs, they affect all tables in the changefeed. +// we need to wait all tables checkpointTs reach the DDL commitTs +// before we can execute the DDL. +//timodel.ActionCreateSchema +//timodel.ActionDropSchema +//timodel.ActionModifySchemaCharsetAndCollate +//// We treat create table ddl as a global ddl, because before we execute the ddl, +//// there is no a tablePipeline for the new table. So we can't prevent the checkpointTs +//// from advancing. To solve this problem, we just treat create table ddl as a global ddl here. +//// TODO: Find a better way to handle create table ddl. +//timodel.ActionCreateTable +//timodel.ActionRenameTable +//timodel.ActionRenameTables +//timodel.ActionExchangeTablePartition + // nonGlobalDDLs are the DDLs that only affect related table // so that we should only block related table before execute them. var nonGlobalDDLs = map[timodel.ActionType]struct{}{ @@ -64,20 +79,24 @@ var nonGlobalDDLs = map[timodel.ActionType]struct{}{ timodel.ActionAlterTTLRemove: {}, } -// The ddls below is globalDDLs, they affect all tables in the changefeed. -// we need to wait all tables checkpointTs reach the DDL commitTs -// before we can execute the DDL. -//timodel.ActionCreateSchema -//timodel.ActionDropSchema -//timodel.ActionModifySchemaCharsetAndCollate -//// We treat create table ddl as a global ddl, because before we execute the ddl, -//// there is no a tablePipeline for the new table. So we can't prevent the checkpointTs -//// from advancing. To solve this problem, we just treat create table ddl as a global ddl here. -//// TODO: Find a better way to handle create table ddl. -//timodel.ActionCreateTable -//timodel.ActionRenameTable -//timodel.ActionRenameTables -//timodel.ActionExchangeTablePartition +var redoBarrierDDLs = map[timodel.ActionType]struct{}{ + timodel.ActionCreateTable: {}, + timodel.ActionTruncateTable: {}, + timodel.ActionAddTablePartition: {}, + timodel.ActionTruncateTablePartition: {}, + timodel.ActionRecoverTable: {}, +} + +type ddlBarrier struct { + *schedulepb.Barrier + // minTableBarrierTs is the minimum table barrier timestamp of all tables. + // It is only used when a changefeed is started to check whether there was + // a table's DDL job that had not finished when the changefeed was stopped. + minTableBarrierTs model.Ts + // physicalTableBarrierTs is the minimum ts of all ddl events that create + // a new physical table. + physicalTableBarrierTs model.Ts +} // ddlManager holds the pending DDL events of all tables and responsible for // executing them to downstream. @@ -169,16 +188,13 @@ func (m *ddlManager) tick( ctx context.Context, checkpointTs model.Ts, tableCheckpoint map[model.TableName]model.Ts, -) ([]model.TableID, model.Ts, *schedulepb.Barrier, error) { - minTableBarrierTs := model.Ts(0) - var barrier *schedulepb.Barrier +) ([]model.TableID, *ddlBarrier, error) { m.justSentDDL = nil - m.updateCheckpointTs(checkpointTs, tableCheckpoint) currentTables, err := m.allTables(ctx) if err != nil { - return nil, minTableBarrierTs, barrier, errors.Trace(err) + return nil, nil, errors.Trace(err) } if m.executingDDL == nil { @@ -187,7 +203,7 @@ func (m *ddlManager) tick( tableIDs, err := m.allPhysicalTables(ctx) if err != nil { - return nil, minTableBarrierTs, barrier, errors.Trace(err) + return nil, nil, errors.Trace(err) } // drain all ddl jobs from ddlPuller @@ -209,7 +225,7 @@ func (m *ddlManager) tick( ) events, err := m.schema.BuildDDLEvents(ctx, job) if err != nil { - return nil, minTableBarrierTs, barrier, err + return nil, nil, err } for _, event := range events { @@ -238,7 +254,7 @@ func (m *ddlManager) tick( for _, event := range events { err := m.redoDDLManager.EmitDDLEvent(ctx, event) if err != nil { - return nil, minTableBarrierTs, barrier, err + return nil, nil, err } } } @@ -251,7 +267,7 @@ func (m *ddlManager) tick( if m.redoDDLManager.Enabled() { err := m.redoDDLManager.UpdateResolvedTs(ctx, ddlRts) if err != nil { - return nil, minTableBarrierTs, barrier, err + return nil, nil, err } redoFlushedDDLRts := m.redoDDLManager.GetResolvedTs() if redoFlushedDDLRts < ddlRts { @@ -288,14 +304,12 @@ func (m *ddlManager) tick( err := m.executeDDL(ctx) if err != nil { - return nil, minTableBarrierTs, barrier, err + return nil, nil, err } } } - minTableBarrierTs, barrier = m.barrier() - - return tableIDs, minTableBarrierTs, barrier, nil + return tableIDs, m.barrier(), nil } func (m *ddlManager) shouldExecDDL(nextDDL *model.DDLEvent) bool { @@ -423,22 +437,34 @@ func (m *ddlManager) getAllTableNextDDL() []*model.DDLEvent { } // barrier returns ddlResolvedTs and tableBarrier -func (m *ddlManager) barrier() (model.Ts, *schedulepb.Barrier) { +func (m *ddlManager) barrier() *ddlBarrier { + barrier := &ddlBarrier{ + Barrier: &schedulepb.Barrier{ + GlobalBarrierTs: m.ddlResolvedTs, + }, + minTableBarrierTs: m.ddlResolvedTs, + physicalTableBarrierTs: m.ddlResolvedTs, + } tableBarrierMap := make(map[model.TableID]model.Ts) - var tableBarrier []*schedulepb.TableBarrier - minTableBarrierTs := m.ddlResolvedTs - globalBarrierTs := m.ddlResolvedTs - ddls := m.getAllTableNextDDL() if m.justSentDDL != nil { ddls = append(ddls, m.justSentDDL) } for _, ddl := range ddls { + if m.redoMetaManager.Enabled() && isRedoBerrierDDL(ddl) { + // The pipeline for a new table does not exist until the ddl is successfully + // executed, so the table's resolvedTs will not be calculated in redo. + // To solve this problem, resovedTs of redoDMLManager should not be greater + // than the min commitTs of create table DDL. + if ddl.CommitTs < barrier.physicalTableBarrierTs { + barrier.physicalTableBarrierTs = ddl.CommitTs + } + } // When there is a global DDL, we need to wait all tables // checkpointTs reach its commitTs before we can execute it. if isGlobalDDL(ddl) { - if ddl.CommitTs < globalBarrierTs { - globalBarrierTs = ddl.CommitTs + if ddl.CommitTs < barrier.GlobalBarrierTs { + barrier.GlobalBarrierTs = ddl.CommitTs } } else { ids := getPhysicalTableIDs(ddl) @@ -450,17 +476,18 @@ func (m *ddlManager) barrier() (model.Ts, *schedulepb.Barrier) { // minTableBarrierTs is the min commitTs of all tables DDLs, // it is used to prevent the checkpointTs from advancing too fast // when a changefeed is just resumed. - if ddl.CommitTs < minTableBarrierTs { - minTableBarrierTs = ddl.CommitTs + if ddl.CommitTs < barrier.minTableBarrierTs { + barrier.minTableBarrierTs = ddl.CommitTs } } for tb, barrierTs := range tableBarrierMap { - if barrierTs > globalBarrierTs { + if barrierTs > barrier.GlobalBarrierTs { delete(tableBarrierMap, tb) } } + var tableBarrier []*schedulepb.TableBarrier for tb, barrierTs := range tableBarrierMap { tableBarrier = append(tableBarrier, &schedulepb.TableBarrier{ TableID: tb, @@ -474,15 +501,13 @@ func (m *ddlManager) barrier() (model.Ts, *schedulepb.Barrier) { return tableBarrier[i].BarrierTs < tableBarrier[j].BarrierTs }) if len(tableBarrier) > tableBarrierNumberLimit { - globalBarrierTs = tableBarrier[tableBarrierNumberLimit].BarrierTs + barrier.GlobalBarrierTs = tableBarrier[tableBarrierNumberLimit].BarrierTs tableBarrier = tableBarrier[:tableBarrierNumberLimit] } m.justSentDDL = nil - return minTableBarrierTs, &schedulepb.Barrier{ - TableBarriers: tableBarrier, - GlobalBarrierTs: globalBarrierTs, - } + barrier.TableBarriers = tableBarrier + return barrier } // allTables returns all tables in the schema that @@ -600,3 +625,8 @@ func isGlobalDDL(ddl *model.DDLEvent) bool { _, ok := nonGlobalDDLs[ddl.Type] return !ok } + +func isRedoBerrierDDL(ddl *model.DDLEvent) bool { + _, ok := redoBarrierDDLs[ddl.Type] + return ok +} diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index b77c3a87917..1822df696ae 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -21,6 +21,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" config2 "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" @@ -45,7 +46,8 @@ func createDDLManagerForTest(t *testing.T) *ddlManager { ddlSink, ddlPuller, schema, - nil, nil, + redo.NewDisabledDDLManager(), + redo.NewDisabledMetaManager(), model.DB, false) return res } @@ -117,7 +119,8 @@ func TestBarriers(t *testing.T) { } // advance the ddlResolvedTs dm.ddlResolvedTs = 6 - minTableBarrierTs, barrier := dm.barrier() + ddlBarrier := dm.barrier() + minTableBarrierTs, barrier := ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier require.Equal(t, expectedMinTableBarrier, minTableBarrierTs) require.Equal(t, expectedBarrier, barrier) @@ -130,7 +133,8 @@ func TestBarriers(t *testing.T) { dm.pendingDDLs[tableName] = append(dm.pendingDDLs[tableName], newFakeDDLEvent(tableID, tableName.Table, timodel.ActionAddColumn, uint64(i))) } - minTableBarrierTs, barrier = dm.barrier() + ddlBarrier = dm.barrier() + minTableBarrierTs, barrier = ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier require.Equal(t, uint64(0), minTableBarrierTs) require.Equal(t, uint64(256), barrier.GlobalBarrierTs) require.Equal(t, 256, len(barrier.TableBarriers)) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index b25c5a30f38..7104cbd8a0a 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -57,6 +57,13 @@ type DDLManager interface { GetResolvedTs() model.Ts } +// NewDisabledDDLManager creates a disabled ddl Manager. +func NewDisabledDDLManager() *ddlManager { + return &ddlManager{ + logManager: &logManager{enabled: false}, + } +} + // NewDDLManager creates a new ddl Manager. func NewDDLManager( ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts, diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 05999a2b15e..12ba5e71b29 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -68,6 +68,13 @@ type metaManager struct { metricFlushLogDuration prometheus.Observer } +// NewDisabledMetaManager creates a disabled Meta Manager. +func NewDisabledMetaManager() *metaManager { + return &metaManager{ + enabled: false, + } +} + // NewMetaManagerWithInit creates a new Manager and initializes the meta. func NewMetaManagerWithInit( ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts, diff --git a/tests/integration_tests/consistent_replicate_ddl/conf/diff_config.toml b/tests/integration_tests/consistent_replicate_ddl/conf/diff_config.toml index 2566d298549..dcc769e5d3f 100644 --- a/tests/integration_tests/consistent_replicate_ddl/conf/diff_config.toml +++ b/tests/integration_tests/consistent_replicate_ddl/conf/diff_config.toml @@ -21,6 +21,7 @@ check-struct-only = false port = 4000 user = "root" password = "" + snapshot = "" [data-sources.mysql] host = "127.0.0.1" diff --git a/tests/integration_tests/consistent_replicate_ddl/run.sh b/tests/integration_tests/consistent_replicate_ddl/run.sh index c144c623819..b1e23abc5b5 100644 --- a/tests/integration_tests/consistent_replicate_ddl/run.sh +++ b/tests/integration_tests/consistent_replicate_ddl/run.sh @@ -13,7 +13,7 @@ mkdir -p "$WORK_DIR" stop() { # to distinguish whether the test failed in the DML synchronization phase or the DDL synchronization phase - echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "select count(*) from consistent_replicate_ddl.usertable;") + echo $(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -e "SELECT count(*) FROM consistent_replicate_ddl.usertable;") stop_tidb_cluster } @@ -33,45 +33,72 @@ function run() { SINK_URI="mysql://normal:123456@127.0.0.1:3306/" changefeed_id=$(cdc cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') - run_sql "CREATE DATABASE consistent_replicate_ddl;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE DATABASE consistent_replicate_ddl CHARACTER SET utf8 COLLATE utf8_unicode_ci" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=consistent_replicate_ddl - run_sql "create table consistent_replicate_ddl.usertable2 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table consistent_replicate_ddl.usertable3 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "consistent_replicate_ddl.usertable" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "CREATE TABLE consistent_replicate_ddl.usertable1 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE consistent_replicate_ddl.usertable2 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE consistent_replicate_ddl.usertable3 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE consistent_replicate_ddl.usertable_bak like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "consistent_replicate_ddl.usertable1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists "consistent_replicate_ddl.usertable2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_table_exists "consistent_replicate_ddl.usertable3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "consistent_replicate_ddl.usertable_bak" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - sleep 20 + cleanup_process $CDC_BINARY # Inject the failpoint to prevent sink execution, but the global resolved can be moved forward. # Then we can apply redo log to reach an eventual consistent state in downstream. - cleanup_process $CDC_BINARY export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - run_sql "insert into consistent_replicate_ddl.usertable2 select * from consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # case 1: + # global ddl tests -> ActionRenameTable + # table ddl tests -> ActionDropTable + run_sql "DROP TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "RENAME TABLE consistent_replicate_ddl.usertable_bak TO consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO consistent_replicate_ddl.usertable1 SELECT * FROM consistent_replicate_ddl.usertable limit 10" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # run_sql "TRUNCATE TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "RENAME TABLE consistent_replicate_ddl.usertable1 TO consistent_replicate_ddl.usertable1_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT IGNORE INTO consistent_replicate_ddl.usertable1_1 SELECT * FROM consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # case 2: + # global ddl tests -> ActionCreateSchema, ActionModifySchemaCharsetAndCollate + # table ddl tests -> ActionMultiSchemaChange, ActionAddColumn, ActionDropColumn, ActionModifyTableCharsetAndCollate + run_sql "CREATE DATABASE consistent_replicate_ddl1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER DATABASE consistent_replicate_ddl CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 CHARACTER SET utf8mb4 COLLATE utf8mb4_bin" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO consistent_replicate_ddl.usertable2 SELECT * FROM consistent_replicate_ddl.usertable limit 20" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 DROP COLUMN FIELD0" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 ADD COLUMN dummy varchar(30)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into consistent_replicate_ddl.usertable3 select * from consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "ALTER TABLE consistent_replicate_ddl.usertable3 DROP COLUMN FIELD1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # enable this after global resolved ts is decoupled from globalBarrierTs - # run_sql "create table consistent_replicate_ddl.usertable3 like consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # run_sql "insert into consistent_replicate_ddl.usertable3 select * from consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # run_sql "CREATE table consistent_replicate_ddl.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # case 3: + # global ddl tests -> ActionDropSchema, ActionRenameTables + # table ddl tests -> ActionModifyColumn + run_sql "DROP DATABASE consistent_replicate_ddl1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_ddl.usertable3 CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "RENAME TABLE consistent_replicate_ddl.usertable2 to consistent_replicate_ddl.usertable2_1, consistent_replicate_ddl.usertable3 TO consistent_replicate_ddl.usertable3_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "ALTER TABLE consistent_replicate_ddl.usertable3_1 MODIFY COLUMN FIELD1 varchar(100)" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO consistent_replicate_ddl.usertable3_1 SELECT * FROM consistent_replicate_ddl.usertable limit 31" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_sql "CREATE table consistent_replicate_ddl.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} # to ensure row changed events have been replicated to TiCDC - sleep 20 + sleep 120 + cleanup_process $CDC_BINARY storage_path="file://$WORK_DIR/redo" tmp_download_path=$WORK_DIR/cdc_data/redo/$changefeed_id - current_tso=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - ensure 50 check_redo_resolved_ts $changefeed_id $current_tso $storage_path $tmp_download_path/meta - cleanup_process $CDC_BINARY - export GO_FAILPOINTS='' + rts=$(cdc redo meta --storage="$storage_path" --tmp-dir="$tmp_download_path" | grep -oE "resolved-ts:[0-9]+" | awk -F: '{print $2}') + sed "s//$rts/g" $CUR/conf/diff_config.toml >$WORK_DIR/diff_config.toml + + cat $WORK_DIR/diff_config.toml cdc redo apply --tmp-dir="$tmp_download_path/apply" \ --storage="$storage_path" \ --sink-uri="mysql://normal:123456@127.0.0.1:3306/" - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + # sleep 6000000000000 + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml } trap stop EXIT From 8e11bbdd1813846ec1448fd271adef75af5569e7 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 17 May 2023 14:42:18 +0800 Subject: [PATCH 2/4] fix syncpoint and finish --- cdc/owner/changefeed.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 20d2c947442..0a90deea80a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -900,25 +900,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { // 1. All data before the barrierTs was sent to downstream. // 2. No more data after barrierTs was sent to downstream. checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs - - // TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition. - fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs + if !checkpointReachBarrier { + return barrierTs, nil + } switch barrierTp { case syncPointBarrier: - if !fullyBlocked { - return barrierTs, nil - } nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval)) if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil { return 0, errors.Trace(err) } c.barriers.Update(syncPointBarrier, nextSyncPointTs) case finishBarrier: - if fullyBlocked { - c.feedStateManager.MarkFinished() - } - return barrierTs, nil + c.feedStateManager.MarkFinished() default: log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp))) } From 53a3ef0286c28a0e7da7cc68f5750f639233389d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 17 May 2023 16:29:01 +0800 Subject: [PATCH 3/4] refactor some code --- cdc/model/owner.go | 6 +-- cdc/owner/changefeed.go | 26 +++++------ cdc/owner/ddl_manager.go | 82 ++++++++++++++++------------------- cdc/owner/ddl_manager_test.go | 4 +- 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/cdc/model/owner.go b/cdc/model/owner.go index 65ae1a78ce2..e62b70b3cea 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -262,9 +262,9 @@ func (p ProcessorsInfos) String() string { type ChangeFeedStatus struct { ResolvedTs uint64 `json:"resolved-ts"` CheckpointTs uint64 `json:"checkpoint-ts"` - // MinTableBarrierTs is the minimum table barrier timestamp of all tables. - // It is only used when a changefeed is started to check whether there was - // a table's DDL job that had not finished when the changefeed was stopped. + // minTableBarrierTs is the minimum commitTs of all DDL events and is only + // used to check whether there is a pending DDL job at the checkpointTs when + // initializing the changefeed. MinTableBarrierTs uint64 `json:"min-table-barrier-ts"` AdminJobType AdminJobType `json:"admin-job-type"` } diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 0a90deea80a..5e1f4a48eb0 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -326,11 +326,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* barrier.GlobalBarrierTs = otherBarrierTs } - if barrier.minTableBarrierTs > otherBarrierTs { + if barrier.minDDLBarrierTs > otherBarrierTs { log.Debug("There are other barriers less than min table barrier, wait for them", zap.Uint64("otherBarrierTs", otherBarrierTs), zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs)) - barrier.minTableBarrierTs = otherBarrierTs + barrier.minDDLBarrierTs = otherBarrierTs } log.Debug("owner handles barrier", @@ -339,7 +339,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs), - zap.Uint64("minTableBarrierTs", barrier.minTableBarrierTs), + zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs), zap.Any("tableBarrier", barrier.TableBarriers)) if barrier.GlobalBarrierTs < checkpointTs { @@ -352,8 +352,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* startTime := time.Now() newCheckpointTs, newResolvedTs, err := c.scheduler.Tick( ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier) - // metricsResolvedTs to store the min resolved ts among all tables and show it in metrics - metricsResolvedTs := newResolvedTs costTime := time.Since(startTime) if costTime > schedulerLogsWarnDuration { log.Warn("scheduler tick took too long", @@ -387,14 +385,14 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`. // In such case the `newCheckpointTs` may be larger than the minTableBarrierTs, // but it shouldn't be, so we need to handle it here. - if newCheckpointTs > barrier.minTableBarrierTs { - newCheckpointTs = barrier.minTableBarrierTs + if newCheckpointTs > barrier.minDDLBarrierTs { + newCheckpointTs = barrier.minDDLBarrierTs } prevResolvedTs := c.state.Status.ResolvedTs if c.redoMetaMgr.Enabled() { - if newResolvedTs > barrier.physicalTableBarrierTs { - newResolvedTs = barrier.physicalTableBarrierTs + if newResolvedTs > barrier.redoBarrierTs { + newResolvedTs = barrier.redoBarrierTs } // newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled, // we can only upload it to etcd after it has been flushed into redo meta. @@ -416,7 +414,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } else { newResolvedTs = prevResolvedTs } - metricsResolvedTs = newResolvedTs } log.Debug("owner prepares to update status", zap.Uint64("prevResolvedTs", prevResolvedTs), @@ -428,12 +425,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // been decreased when the owner is initialized. if newResolvedTs < prevResolvedTs { newResolvedTs = prevResolvedTs - metricsResolvedTs = newResolvedTs } // MinTableBarrierTs should never regress - if barrier.minTableBarrierTs < c.state.Status.MinTableBarrierTs { - barrier.minTableBarrierTs = c.state.Status.MinTableBarrierTs + if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs { + barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs } failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() { @@ -447,8 +443,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } }) - c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minTableBarrierTs) - c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs) + c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs) + c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs) c.tickDownstreamObserver(ctx) return nil diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index fc8da931e1d..6909d0b42e7 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -89,13 +89,12 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{ type ddlBarrier struct { *schedulepb.Barrier - // minTableBarrierTs is the minimum table barrier timestamp of all tables. - // It is only used when a changefeed is started to check whether there was - // a table's DDL job that had not finished when the changefeed was stopped. - minTableBarrierTs model.Ts - // physicalTableBarrierTs is the minimum ts of all ddl events that create - // a new physical table. - physicalTableBarrierTs model.Ts + // minDDLBarrierTs is the minimum commitTs of all DDL events and is only + // used to check whether there is a pending DDL job at the checkpointTs when + // initializing the changefeed. + minDDLBarrierTs model.Ts + // redoBarrierTs is the minimum ts of ddl events that create a new physical table. + redoBarrierTs model.Ts } // ddlManager holds the pending DDL events of all tables and responsible for @@ -442,71 +441,66 @@ func (m *ddlManager) barrier() *ddlBarrier { Barrier: &schedulepb.Barrier{ GlobalBarrierTs: m.ddlResolvedTs, }, - minTableBarrierTs: m.ddlResolvedTs, - physicalTableBarrierTs: m.ddlResolvedTs, + minDDLBarrierTs: m.ddlResolvedTs, + redoBarrierTs: m.ddlResolvedTs, } tableBarrierMap := make(map[model.TableID]model.Ts) ddls := m.getAllTableNextDDL() if m.justSentDDL != nil { ddls = append(ddls, m.justSentDDL) } + for _, ddl := range ddls { - if m.redoMetaManager.Enabled() && isRedoBerrierDDL(ddl) { + if ddl.CommitTs < barrier.minDDLBarrierTs { + barrier.minDDLBarrierTs = ddl.CommitTs + } + if m.redoMetaManager.Enabled() && isRedoBarrierDDL(ddl) { // The pipeline for a new table does not exist until the ddl is successfully // executed, so the table's resolvedTs will not be calculated in redo. - // To solve this problem, resovedTs of redoDMLManager should not be greater - // than the min commitTs of create table DDL. - if ddl.CommitTs < barrier.physicalTableBarrierTs { - barrier.physicalTableBarrierTs = ddl.CommitTs + // To solve this problem, resovedTs of redo manager should not be greater + // than the min commitTs of ddls that create a new physical table. + if ddl.CommitTs < barrier.redoBarrierTs { + barrier.redoBarrierTs = ddl.CommitTs } } - // When there is a global DDL, we need to wait all tables - // checkpointTs reach its commitTs before we can execute it. if isGlobalDDL(ddl) { + // When there is a global DDL, we need to wait all tables + // checkpointTs reach its commitTs before we can execute it. if ddl.CommitTs < barrier.GlobalBarrierTs { barrier.GlobalBarrierTs = ddl.CommitTs } } else { - ids := getPhysicalTableIDs(ddl) + // barrier related physical tables + ids := getRelatedPhysicalTableIDs(ddl) for _, id := range ids { tableBarrierMap[id] = ddl.CommitTs } } - - // minTableBarrierTs is the min commitTs of all tables DDLs, - // it is used to prevent the checkpointTs from advancing too fast - // when a changefeed is just resumed. - if ddl.CommitTs < barrier.minTableBarrierTs { - barrier.minTableBarrierTs = ddl.CommitTs - } } - for tb, barrierTs := range tableBarrierMap { - if barrierTs > barrier.GlobalBarrierTs { - delete(tableBarrierMap, tb) + // calculate tableBarriers + var tableBarriers []*schedulepb.TableBarrier + for tableID, tableBarrierTs := range tableBarrierMap { + if tableBarrierTs > barrier.GlobalBarrierTs { + continue } - } - - var tableBarrier []*schedulepb.TableBarrier - for tb, barrierTs := range tableBarrierMap { - tableBarrier = append(tableBarrier, &schedulepb.TableBarrier{ - TableID: tb, - BarrierTs: barrierTs, + tableBarriers = append(tableBarriers, &schedulepb.TableBarrier{ + TableID: tableID, + BarrierTs: tableBarrierTs, }) } - // Limit the tableBarrier size to avoid too large barrier. Since it will // cause the scheduler to be slow. - sort.Slice(tableBarrier, func(i, j int) bool { - return tableBarrier[i].BarrierTs < tableBarrier[j].BarrierTs + sort.Slice(tableBarriers, func(i, j int) bool { + return tableBarriers[i].BarrierTs < tableBarriers[j].BarrierTs }) - if len(tableBarrier) > tableBarrierNumberLimit { - barrier.GlobalBarrierTs = tableBarrier[tableBarrierNumberLimit].BarrierTs - tableBarrier = tableBarrier[:tableBarrierNumberLimit] + if len(tableBarriers) > tableBarrierNumberLimit { + barrier.GlobalBarrierTs = tableBarriers[tableBarrierNumberLimit].BarrierTs + tableBarriers = tableBarriers[:tableBarrierNumberLimit] } m.justSentDDL = nil - barrier.TableBarriers = tableBarrier + barrier.TableBarriers = tableBarriers return barrier } @@ -597,9 +591,9 @@ func (m *ddlManager) cleanCache() { m.physicalTablesCache = nil } -// getPhysicalTableIDs get all related physical table ids of a ddl event. +// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event. // It is a helper function to calculate tableBarrier. -func getPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID { +func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID { res := make([]model.TableID, 0, 1) table := ddl.TableInfo if ddl.PreTableInfo != nil { @@ -626,7 +620,7 @@ func isGlobalDDL(ddl *model.DDLEvent) bool { return !ok } -func isRedoBerrierDDL(ddl *model.DDLEvent) bool { +func isRedoBarrierDDL(ddl *model.DDLEvent) bool { _, ok := redoBarrierDDLs[ddl.Type] return ok } diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index 1822df696ae..762e12e2d41 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -120,7 +120,7 @@ func TestBarriers(t *testing.T) { // advance the ddlResolvedTs dm.ddlResolvedTs = 6 ddlBarrier := dm.barrier() - minTableBarrierTs, barrier := ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier + minTableBarrierTs, barrier := ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier require.Equal(t, expectedMinTableBarrier, minTableBarrierTs) require.Equal(t, expectedBarrier, barrier) @@ -134,7 +134,7 @@ func TestBarriers(t *testing.T) { newFakeDDLEvent(tableID, tableName.Table, timodel.ActionAddColumn, uint64(i))) } ddlBarrier = dm.barrier() - minTableBarrierTs, barrier = ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier + minTableBarrierTs, barrier = ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier require.Equal(t, uint64(0), minTableBarrierTs) require.Equal(t, uint64(256), barrier.GlobalBarrierTs) require.Equal(t, 256, len(barrier.TableBarriers)) From 0f9f299f88716a7fb070e7db6b9acef2a133695d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Thu, 18 May 2023 08:51:10 +0800 Subject: [PATCH 4/4] fix lint --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index b3bc017a2e7..83561c33ccb 100644 --- a/Makefile +++ b/Makefile @@ -158,6 +158,7 @@ storage_consumer: kafka_consumer_image: @which docker || (echo "docker not found in ${PATH}"; exit 1) DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/kafka-consumer.Dockerfile . -t ticdc:kafka-consumer --platform linux/amd64 + storage_consumer_image: @which docker || (echo "docker not found in ${PATH}"; exit 1) DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/storage-consumer.Dockerfile . -t ticdc:storage-consumer --platform linux/amd64