From 314d79cb8089586dd7ac38071f2ff17849ac95cf Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Mon, 26 Dec 2022 18:48:16 +0800 Subject: [PATCH] sinkv2 (ticdc): limit batch dml size (#7960) close pingcap/tiflow#7959 --- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 115 ++++++--- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 236 +++++++++++++++++++ 2 files changed, 318 insertions(+), 33 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index b306a3caf17..4374a181a4d 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -273,72 +273,121 @@ func convertBinaryToString(row *model.RowChangedEvent) { } } -// TODO: Find a way to make batch delete dmls more efficient. -func groupRowsByType( +func (s *mysqlBackend) groupRowsByType( event *eventsink.TxnCallbackableEvent, tableInfo *timodel.TableInfo, spiltUpdate bool, -) (insertRows, updateRows, deleteRows []*sqlmodel.RowChange) { +) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) { + preAllocateSize := len(event.Event.Rows) + if preAllocateSize > s.cfg.MaxTxnRow { + preAllocateSize = s.cfg.MaxTxnRow + } + + insertRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + updateRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + for _, row := range event.Event.Rows { convertBinaryToString(row) + if row.IsInsert() { - insertRows = append( - insertRows, + insertRow = append( + insertRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) - } else if row.IsDelete() { - deleteRows = append( - deleteRows, + if len(insertRow) >= s.cfg.MaxTxnRow { + insertRows = append(insertRows, insertRow) + insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } + } + + if row.IsDelete() { + deleteRow = append( + deleteRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) - } else if row.IsUpdate() { + if len(deleteRow) >= s.cfg.MaxTxnRow { + deleteRows = append(deleteRows, deleteRow) + deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } + } + + if row.IsUpdate() { if spiltUpdate { - deleteRows = append( - deleteRows, + deleteRow = append( + deleteRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) - insertRows = append( - insertRows, + if len(deleteRow) >= s.cfg.MaxTxnRow { + deleteRows = append(deleteRows, deleteRow) + deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } + insertRow = append( + insertRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) + if len(insertRow) >= s.cfg.MaxTxnRow { + insertRows = append(insertRows, insertRow) + insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } } else { - updateRows = append( - updateRows, + updateRow = append( + updateRow, convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) + if len(updateRow) >= s.cfg.MaxTxnRow { + updateRows = append(updateRows, updateRow) + updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } } } } + + if len(insertRow) > 0 { + insertRows = append(insertRows, insertRow) + } + if len(updateRow) > 0 { + updateRows = append(updateRows, updateRow) + } + if len(deleteRow) > 0 { + deleteRows = append(deleteRows, deleteRow) + } + return } -func batchSingleTxnDmls( +func (s *mysqlBackend) batchSingleTxnDmls( event *eventsink.TxnCallbackableEvent, tableInfo *timodel.TableInfo, translateToInsert bool, ) (sqls []string, values [][]interface{}) { - insertRows, updateRows, deleteRows := groupRowsByType(event, tableInfo, !translateToInsert) + insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert) if len(deleteRows) > 0 { - sql, value := sqlmodel.GenDeleteSQL(deleteRows...) - sqls = append(sqls, sql) - values = append(values, value) + for _, rows := range deleteRows { + sql, value := sqlmodel.GenDeleteSQL(rows...) + sqls = append(sqls, sql) + values = append(values, value) + } } // handle insert if len(insertRows) > 0 { - if translateToInsert { - sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, insertRows...) - sqls = append(sqls, sql) - values = append(values, value) - } else { - sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, insertRows...) - sqls = append(sqls, sql) - values = append(values, value) + for _, rows := range insertRows { + if translateToInsert { + sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...) + sqls = append(sqls, sql) + values = append(values, value) + } else { + sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, rows...) + sqls = append(sqls, sql) + values = append(values, value) + } } } // handle update if len(updateRows) > 0 { - // TODO: do a testing on update performance. - sql, value := sqlmodel.GenUpdateSQL(updateRows...) - sqls = append(sqls, sql) - values = append(values, value) + for _, rows := range updateRows { + sql, value := sqlmodel.GenUpdateSQL(rows...) + sqls = append(sqls, sql) + values = append(values, value) + } } return @@ -417,7 +466,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if hasHandleKey(tableColumns) { // TODO(dongmen): find a better way to get table info. tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns) - sql, value := batchSingleTxnDmls(event, tableInfo, translateToInsert) + sql, value := s.batchSingleTxnDmls(event, tableInfo, translateToInsert) sqls = append(sqls, sql...) values = append(values, value...) continue diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 48fe38f1f97..1f3a86065ce 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -1457,3 +1457,239 @@ func TestNetworkPartition(t *testing.T) { err := ms.execDMLWithMaxRetries(ctx, &preparedDMLs{}) require.Equal(t, context.Canceled, err) } + +func TestGroupRowsByType(t *testing.T) { + ctx := context.Background() + ms := newMySQLBackendWithoutDB(ctx) + testCases := []struct { + name string + input []*model.RowChangedEvent + maxTxnRow int + }{ + { + name: "delete", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + maxTxnRow: 2, + }, + { + name: "insert", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 1}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + }, + maxTxnRow: 4, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := &eventsink.TxnCallbackableEvent{ + Event: &model.SingleTableTxn{Rows: testCases[0].input}, + } + colums := tc.input[0].Columns + if len(colums) == 0 { + colums = tc.input[0].PreColumns + } + tableInfo := model.BuildTiDBTableInfo(colums, tc.input[0].IndexColumns) + ms.cfg.MaxTxnRow = tc.maxTxnRow + inserts, updates, deletes := ms.groupRowsByType(event, tableInfo, false) + for _, rows := range inserts { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + for _, rows := range updates { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + for _, rows := range deletes { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + }) + } +}