-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pkg(ticdc): add a faster raw string rendering version of GenUpdateSQL #8069
Changes from 2 commits
099b674
5168a5a
616a838
985de55
89b7b23
7eee5e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -122,6 +122,152 @@ func GenDeleteSQL(changes ...*RowChange) (string, []interface{}) { | |||||||||||||||||||||||||
return buf.String(), args | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// GenUpdateSQLFast generates the UPDATE SQL and its arguments. | ||||||||||||||||||||||||||
// Input `changes` should have same target table and same columns for WHERE | ||||||||||||||||||||||||||
// (typically same PK/NOT NULL UK), otherwise the behaviour is undefined. | ||||||||||||||||||||||||||
// It is a faster version compared with GenUpdateSQL. | ||||||||||||||||||||||||||
func GenUpdateSQLFast(changes ...*RowChange) (string, []interface{}) { | ||||||||||||||||||||||||||
amyangfei marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
if len(changes) == 0 { | ||||||||||||||||||||||||||
log.L().DPanic("row changes is empty") | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we always use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It keeps the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's OK to use log.Panic. @hi-rustin in DM when the log level is debug, we turn on Development flag. Lines 107 to 118 in 171e21b
and DM integration tests always use debug level log, so we can detect the misuse in developement and control the blast radius when the bug is really happened at user's side. But it's not important. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to |
||||||||||||||||||||||||||
return "", nil | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
var buf strings.Builder | ||||||||||||||||||||||||||
buf.Grow(1024) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Generate UPDATE `db`.`table` SET | ||||||||||||||||||||||||||
first := changes[0] | ||||||||||||||||||||||||||
buf.WriteString("UPDATE ") | ||||||||||||||||||||||||||
buf.WriteString(first.targetTable.QuoteString()) | ||||||||||||||||||||||||||
buf.WriteString(" SET ") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Pre-generate essential sub statements used after WHEN, WHERE and IN. | ||||||||||||||||||||||||||
var ( | ||||||||||||||||||||||||||
whereCaseStmt string | ||||||||||||||||||||||||||
whenCaseStmt string | ||||||||||||||||||||||||||
inCaseStmt string | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
whereColumns, _ := first.whereColumnsAndValues() | ||||||||||||||||||||||||||
if len(whereColumns) == 1 { | ||||||||||||||||||||||||||
// one field PK or UK, use `field`=? directly. | ||||||||||||||||||||||||||
whereCaseStmt = quotes.QuoteName(whereColumns[0]) | ||||||||||||||||||||||||||
whenCaseStmt = whereCaseStmt + "=?" | ||||||||||||||||||||||||||
inCaseStmt = valuesHolder(len(changes)) | ||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||
// multiple fields PK or UK, use ROW(...fields) expression. | ||||||||||||||||||||||||||
whereValuesHolder := valuesHolder(len(whereColumns)) | ||||||||||||||||||||||||||
whereCaseStmt = "ROW(" | ||||||||||||||||||||||||||
for i, column := range whereColumns { | ||||||||||||||||||||||||||
whereCaseStmt += quotes.QuoteName(column) | ||||||||||||||||||||||||||
if i != len(whereColumns)-1 { | ||||||||||||||||||||||||||
whereCaseStmt += "," | ||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||
whereCaseStmt += ")" | ||||||||||||||||||||||||||
whenCaseStmt = whereCaseStmt + "=ROW" + whereValuesHolder | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
var inCaseStmtBuf strings.Builder | ||||||||||||||||||||||||||
// inCaseStmt sample: IN (ROW(?,?,?),ROW(?,?,?)) | ||||||||||||||||||||||||||
// ^ ^ | ||||||||||||||||||||||||||
// Buffer size count between |---------------------| | ||||||||||||||||||||||||||
// equals to 3 * len(changes) for each `ROW` | ||||||||||||||||||||||||||
// plus 1 * len(changes) - 1 for each `,` between every two ROW(?,?,?) | ||||||||||||||||||||||||||
// plus len(whereValuesHolder) * len(changes) | ||||||||||||||||||||||||||
// plus 2 for `(` and `)` | ||||||||||||||||||||||||||
inCaseStmtBuf.Grow((4+len(whereValuesHolder))*len(changes) + 1) | ||||||||||||||||||||||||||
inCaseStmtBuf.WriteString("(") | ||||||||||||||||||||||||||
for i := range changes { | ||||||||||||||||||||||||||
inCaseStmtBuf.WriteString("ROW") | ||||||||||||||||||||||||||
inCaseStmtBuf.WriteString(whereValuesHolder) | ||||||||||||||||||||||||||
if i != len(changes)-1 { | ||||||||||||||||||||||||||
inCaseStmtBuf.WriteString(",") | ||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||
inCaseStmtBuf.WriteString(")") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
inCaseStmt = inCaseStmtBuf.String() | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Generate `ColumnName`=CASE WHEN .. THEN .. END | ||||||||||||||||||||||||||
// Use this value in order to identify which is the first CaseWhenThen line, | ||||||||||||||||||||||||||
// because generated column can happen any where and it will be skipped. | ||||||||||||||||||||||||||
isFirstCaseWhenThenLine := true | ||||||||||||||||||||||||||
for _, column := range first.targetTableInfo.Columns { | ||||||||||||||||||||||||||
if isGenerated(first.targetTableInfo.Columns, column.Name) { | ||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
if !isFirstCaseWhenThenLine { | ||||||||||||||||||||||||||
// insert ", " after END of each lines except for the first line. | ||||||||||||||||||||||||||
buf.WriteString(", ") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
buf.WriteString(quotes.QuoteName(column.Name.String()) + "=CASE") | ||||||||||||||||||||||||||
for range changes { | ||||||||||||||||||||||||||
buf.WriteString(" WHEN ") | ||||||||||||||||||||||||||
buf.WriteString(whenCaseStmt) | ||||||||||||||||||||||||||
buf.WriteString(" THEN ?") | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
buf.WriteString(" END") | ||||||||||||||||||||||||||
isFirstCaseWhenThenLine = false | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Generate WHERE .. IN .. | ||||||||||||||||||||||||||
buf.WriteString(" WHERE ") | ||||||||||||||||||||||||||
buf.WriteString(whereCaseStmt) | ||||||||||||||||||||||||||
buf.WriteString(" IN ") | ||||||||||||||||||||||||||
buf.WriteString(inCaseStmt) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// Build args of the UPDATE SQL | ||||||||||||||||||||||||||
var assignValueColumnCount int | ||||||||||||||||||||||||||
var skipColIdx []int | ||||||||||||||||||||||||||
for i, col := range first.sourceTableInfo.Columns { | ||||||||||||||||||||||||||
if isGenerated(first.targetTableInfo.Columns, col.Name) { | ||||||||||||||||||||||||||
skipColIdx = append(skipColIdx, i) | ||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
assignValueColumnCount++ | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
args := make([]interface{}, 0, | ||||||||||||||||||||||||||
assignValueColumnCount*len(changes)*(len(whereColumns)+1)+len(changes)*len(whereColumns)) | ||||||||||||||||||||||||||
argsPerCol := make([][]interface{}, assignValueColumnCount) | ||||||||||||||||||||||||||
for i := 0; i < assignValueColumnCount; i++ { | ||||||||||||||||||||||||||
argsPerCol[i] = make([]interface{}, 0, len(changes)*(len(whereColumns)+1)) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
whereValuesAtTheEnd := make([]interface{}, 0, len(changes)*len(whereColumns)) | ||||||||||||||||||||||||||
for _, change := range changes { | ||||||||||||||||||||||||||
_, whereValues := change.whereColumnsAndValues() | ||||||||||||||||||||||||||
// a simple check about different number of WHERE values, not trying to | ||||||||||||||||||||||||||
// cover all cases | ||||||||||||||||||||||||||
if len(whereValues) != len(whereColumns) { | ||||||||||||||||||||||||||
log.L().DPanic("len(whereValues) != len(whereColumns)", | ||||||||||||||||||||||||||
zap.Int("len(whereValues)", len(whereValues)), | ||||||||||||||||||||||||||
zap.Int("len(whereColumns)", len(whereColumns)), | ||||||||||||||||||||||||||
zap.Any("whereValues", whereValues), | ||||||||||||||||||||||||||
zap.Stringer("sourceTable", change.sourceTable)) | ||||||||||||||||||||||||||
return "", nil | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
whereValuesAtTheEnd = append(whereValuesAtTheEnd, whereValues...) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
i := 0 // used as index of skipColIdx | ||||||||||||||||||||||||||
writeableCol := 0 | ||||||||||||||||||||||||||
for j, val := range change.postValues { | ||||||||||||||||||||||||||
if i < len(skipColIdx) && skipColIdx[i] == j { | ||||||||||||||||||||||||||
i++ | ||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
argsPerCol[writeableCol] = append(argsPerCol[writeableCol], whereValues...) | ||||||||||||||||||||||||||
argsPerCol[writeableCol] = append(argsPerCol[writeableCol], val) | ||||||||||||||||||||||||||
writeableCol++ | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
for _, a := range argsPerCol { | ||||||||||||||||||||||||||
args = append(args, a...) | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
args = append(args, whereValuesAtTheEnd...) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return buf.String(), args | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
// GenUpdateSQL generates the UPDATE SQL and its arguments. | ||||||||||||||||||||||||||
// Input `changes` should have same target table and same columns for WHERE | ||||||||||||||||||||||||||
// (typically same PK/NOT NULL UK), otherwise the behaviour is undefined. | ||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package sqlmodel | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
cdcmodel "github.com/pingcap/tiflow/cdc/model" | ||
) | ||
|
||
func prepareDataOneColoumnPK(t *testing.T, batch int) []*RowChange { | ||
source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
target := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
|
||
sourceTI := mockTableInfo(t, `CREATE TABLE tb (c INT, c2 INT, c3 INT, | ||
c4 VARCHAR(10), c5 VARCHAR(100), c6 VARCHAR(1000), PRIMARY KEY (c))`) | ||
targetTI := mockTableInfo(t, `CREATE TABLE tb (c INT, c2 INT, c3 INT, | ||
c4 VARCHAR(10), c5 VARCHAR(100), c6 VARCHAR(1000), PRIMARY KEY (c))`) | ||
|
||
changes := make([]*RowChange, 0, batch) | ||
for i := 0; i < batch; i++ { | ||
change := NewRowChange(source, target, | ||
[]interface{}{i + 1, i + 2, i + 3, "c4", "c5", "c6"}, | ||
[]interface{}{i + 10, i + 20, i + 30, "c4", "c5", "c6"}, | ||
sourceTI, targetTI, nil) | ||
changes = append(changes, change) | ||
} | ||
return changes | ||
} | ||
|
||
func prepareDataMultiColumnsPK(t *testing.T, batch int) []*RowChange { | ||
source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
target := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
|
||
sourceTI := mockTableInfo(t, `CREATE TABLE tb (c1 INT, c2 INT, c3 INT, c4 INT, | ||
c5 VARCHAR(10), c6 VARCHAR(100), c7 VARCHAR(1000), c8 timestamp, c9 timestamp, | ||
PRIMARY KEY (c1, c2, c3, c4))`) | ||
targetTI := mockTableInfo(t, `CREATE TABLE tb (c1 INT, c2 INT, c3 INT, c4 INT, | ||
c5 VARCHAR(10), c6 VARCHAR(100), c7 VARCHAR(1000), c8 timestamp, c9 timestamp, | ||
PRIMARY KEY (c1, c2, c3, c4))`) | ||
|
||
changes := make([]*RowChange, 0, batch) | ||
for i := 0; i < batch; i++ { | ||
change := NewRowChange(source, target, | ||
[]interface{}{i + 1, i + 2, i + 3, i + 4, "c4", "c5", "c6", "c7", time.Time{}, time.Time{}}, | ||
[]interface{}{i + 10, i + 20, i + 30, i + 40, "c4", "c5", "c6", "c7", time.Time{}, time.Time{}}, | ||
sourceTI, targetTI, nil) | ||
changes = append(changes, change) | ||
} | ||
return changes | ||
} | ||
|
||
// bench cmd: go test -run='^$' -benchmem -bench '^(BenchmarkGenUpdate)$' github.com/pingcap/tiflow/pkg/sqlmodel | ||
func BenchmarkGenUpdate(b *testing.B) { | ||
t := &testing.T{} | ||
type genCase struct { | ||
name string | ||
fn genSQLFunc | ||
prepare func(t *testing.T, batch int) []*RowChange | ||
} | ||
batchSizes := []int{ | ||
1, 2, 4, 8, 16, 32, 64, 128, | ||
} | ||
benchCases := []genCase{ | ||
{ | ||
name: "OneColumnPK-GenUpdateSQL", | ||
fn: GenUpdateSQL, | ||
prepare: prepareDataOneColoumnPK, | ||
}, | ||
{ | ||
name: "OneColumnPK-GenUpdateSQLFast", | ||
fn: GenUpdateSQLFast, | ||
prepare: prepareDataOneColoumnPK, | ||
}, | ||
{ | ||
name: "MultiColumnsPK-GenUpdateSQL", | ||
fn: GenUpdateSQL, | ||
prepare: prepareDataMultiColumnsPK, | ||
}, | ||
{ | ||
name: "MultiColumnsPK-GenUpdateSQLFast", | ||
fn: GenUpdateSQLFast, | ||
prepare: prepareDataMultiColumnsPK, | ||
}, | ||
} | ||
for _, bc := range benchCases { | ||
for _, batch := range batchSizes { | ||
name := fmt.Sprintf("%s-Batch%d", bc.name, batch) | ||
b.Run(name, func(b *testing.B) { | ||
changes := prepareDataOneColoumnPK(t, batch) | ||
for i := 0; i < b.N; i++ { | ||
bc.fn(changes...) | ||
} | ||
}) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the batch size be limited ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have summited a related issue #8084, and this limit will be added in another PR.