Skip to content

Commit

Permalink
sinkv2 (ticdc): limit batch dml size (#7960)
Browse files Browse the repository at this point in the history
close #7959
  • Loading branch information
asddongmen authored Dec 26, 2022
1 parent 2a6f44f commit 314d79c
Show file tree
Hide file tree
Showing 2 changed files with 318 additions and 33 deletions.
115 changes: 82 additions & 33 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,72 +273,121 @@ func convertBinaryToString(row *model.RowChangedEvent) {
}
}

// TODO: Find a way to make batch delete dmls more efficient.
func groupRowsByType(
func (s *mysqlBackend) groupRowsByType(
event *eventsink.TxnCallbackableEvent,
tableInfo *timodel.TableInfo,
spiltUpdate bool,
) (insertRows, updateRows, deleteRows []*sqlmodel.RowChange) {
) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) {
preAllocateSize := len(event.Event.Rows)
if preAllocateSize > s.cfg.MaxTxnRow {
preAllocateSize = s.cfg.MaxTxnRow
}

insertRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)
updateRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)
deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)

for _, row := range event.Event.Rows {
convertBinaryToString(row)

if row.IsInsert() {
insertRows = append(
insertRows,
insertRow = append(
insertRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert))
} else if row.IsDelete() {
deleteRows = append(
deleteRows,
if len(insertRow) >= s.cfg.MaxTxnRow {
insertRows = append(insertRows, insertRow)
insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize)
}
}

if row.IsDelete() {
deleteRow = append(
deleteRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete))
} else if row.IsUpdate() {
if len(deleteRow) >= s.cfg.MaxTxnRow {
deleteRows = append(deleteRows, deleteRow)
deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize)
}
}

if row.IsUpdate() {
if spiltUpdate {
deleteRows = append(
deleteRows,
deleteRow = append(
deleteRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete))
insertRows = append(
insertRows,
if len(deleteRow) >= s.cfg.MaxTxnRow {
deleteRows = append(deleteRows, deleteRow)
deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize)
}
insertRow = append(
insertRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert))
if len(insertRow) >= s.cfg.MaxTxnRow {
insertRows = append(insertRows, insertRow)
insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize)
}
} else {
updateRows = append(
updateRows,
updateRow = append(
updateRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate))
if len(updateRow) >= s.cfg.MaxTxnRow {
updateRows = append(updateRows, updateRow)
updateRow = make([]*sqlmodel.RowChange, 0, preAllocateSize)
}
}
}
}

if len(insertRow) > 0 {
insertRows = append(insertRows, insertRow)
}
if len(updateRow) > 0 {
updateRows = append(updateRows, updateRow)
}
if len(deleteRow) > 0 {
deleteRows = append(deleteRows, deleteRow)
}

return
}

func batchSingleTxnDmls(
func (s *mysqlBackend) batchSingleTxnDmls(
event *eventsink.TxnCallbackableEvent,
tableInfo *timodel.TableInfo,
translateToInsert bool,
) (sqls []string, values [][]interface{}) {
insertRows, updateRows, deleteRows := groupRowsByType(event, tableInfo, !translateToInsert)
insertRows, updateRows, deleteRows := s.groupRowsByType(event, tableInfo, !translateToInsert)

if len(deleteRows) > 0 {
sql, value := sqlmodel.GenDeleteSQL(deleteRows...)
sqls = append(sqls, sql)
values = append(values, value)
for _, rows := range deleteRows {
sql, value := sqlmodel.GenDeleteSQL(rows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}

// handle insert
if len(insertRows) > 0 {
if translateToInsert {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, insertRows...)
sqls = append(sqls, sql)
values = append(values, value)
} else {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, insertRows...)
sqls = append(sqls, sql)
values = append(values, value)
for _, rows := range insertRows {
if translateToInsert {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...)
sqls = append(sqls, sql)
values = append(values, value)
} else {
sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, rows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}

// handle update
if len(updateRows) > 0 {
// TODO: do a testing on update performance.
sql, value := sqlmodel.GenUpdateSQL(updateRows...)
sqls = append(sqls, sql)
values = append(values, value)
for _, rows := range updateRows {
sql, value := sqlmodel.GenUpdateSQL(rows...)
sqls = append(sqls, sql)
values = append(values, value)
}
}

return
Expand Down Expand Up @@ -417,7 +466,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
if hasHandleKey(tableColumns) {
// TODO(dongmen): find a better way to get table info.
tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns)
sql, value := batchSingleTxnDmls(event, tableInfo, translateToInsert)
sql, value := s.batchSingleTxnDmls(event, tableInfo, translateToInsert)
sqls = append(sqls, sql...)
values = append(values, value...)
continue
Expand Down
Loading

0 comments on commit 314d79c

Please sign in to comment.