Skip to content

Commit

Permalink
Properly handle PK changes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 12, 2024
1 parent 8171e14 commit 5b5c569
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 29 deletions.
10 changes: 7 additions & 3 deletions go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/format"
"vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -110,7 +111,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
diff.WriteString(innerStr)
diff.WriteString(", ")
} else { // Only the inner most function has the field name
diff.WriteString("`%s`, ") // This will later be replaced by the field name
diff.WriteString("%s, ") // This will later be replaced by the field name
}

pathLen, readTo := readVariableLength(data, pos)
Expand All @@ -120,7 +121,10 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
// We have to specify the unicode character set for the strings we
// use in the expression as the connection can be using a different
// character set (e.g. vreplication always uses set names binary).
diff.WriteString(fmt.Sprintf("_utf8mb4'%s'", path))
diff.WriteString(sqlparser.Utf8mb4Str)
diff.WriteByte('\'')
diff.Write(path)
diff.WriteByte('\'')

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteString(")")
Expand All @@ -135,7 +139,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
}
pos += valueLen
if value.Type() == json.TypeString {
diff.WriteString("_utf8mb4")
diff.WriteString(sqlparser.Utf8mb4Str)
}
diff.WriteString(fmt.Sprintf("%s)", value))
}
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(JSON_REMOVE(JSON_REPLACE(j3, '$.day', 'tuesday'), '$.favorite_color'), '$.hobby', 'skiing') where id = 4")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(JSON_SET(j3, '$.salary', 110), '$.role', 'IC') where id = 4")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(j3, '$.misc', '{\"address\":\"1012 S Park St\", \"town\":\"Hastings\", \"state\":\"MI\"}') where id = 1")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set id=id+1000, j3=JSON_SET(j3, '$.day', 'friday')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
dec80Replicated := false
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
Expand Down
110 changes: 84 additions & 26 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"bytes"
"encoding/json"
"fmt"
"slices"
Expand All @@ -30,6 +31,7 @@ import (
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -365,7 +367,10 @@ func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*q

func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
// MakeRowTrusted is needed here because Proto3ToResult is not convenient.
var before, after bool
var (
before, after bool
afterVals []sqltypes.Value
)
bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields))
if rowChange.Before != nil {
before = true
Expand All @@ -381,44 +386,45 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
if rowChange.After != nil {
jsonIndex := 0
after = true
vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After)
afterVals = sqltypes.MakeRowTrusted(tp.Fields, rowChange.After)
for i, field := range tp.Fields {
var bindVar *querypb.BindVariable
var newVal *sqltypes.Value
var err error
var (
bindVar *querypb.BindVariable
newVal *sqltypes.Value
err error
)
if field.Type == querypb.Type_JSON {
switch {
case vals[i].IsNull(): // An SQL NULL and not an actual JSON value
case afterVals[i].IsNull(): // An SQL NULL and not an actual JSON value
newVal = &sqltypes.NULL
case rowChange.JsonPartialValues != nil && isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) &&
!slices.Equal(vals[i].Raw(), sqltypes.NullBytes):
// An SQL expression that can be converted to a JSON value such
// as JSON_INSERT().
// This occurs when using partial JSON values as a result of
// mysqld using binlog-row-value-options=PARTIAL_JSON.
if len(vals[i].Raw()) == 0 {
!slices.Equal(afterVals[i].Raw(), sqltypes.NullBytes):
// An SQL expression that can be converted to a JSON value such as JSON_INSERT().
// This occurs when using partial JSON values as a result of mysqld using
// binlog-row-value-options=PARTIAL_JSON.
if len(afterVals[i].Raw()) == 0 {
// When using BOTH binlog_row_image=NOBLOB AND
// binlog_row_value_options=PARTIAL_JSON then the JSON
// column has the data bit set and the diff is empty. So
// we have to account for this by unsetting the data bit
// so that the current JSON value is not overwritten.
// binlog_row_value_options=PARTIAL_JSON then the JSON column has the data bit
// set and the diff is empty when it's not present. So we have to account for
// this by unsetting the data bit so that the current JSON value is not lost.
setBit(rowChange.DataColumns.Cols, i, false)
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, nil))
} else {
escapedName := sqlescape.EscapeID(field.Name)
newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(
fmt.Sprintf(vals[i].RawStr(), field.Name),
fmt.Sprintf(afterVals[i].RawStr(), escapedName),
)))
}
default: // A JSON value (which may be a JSON null literal value)
newVal, err = vjson.MarshalSQLValue(vals[i].Raw())
newVal, err = vjson.MarshalSQLValue(afterVals[i].Raw())
if err != nil {
return nil, err
}
}
bindVar, err = tp.bindFieldVal(field, newVal)
jsonIndex++
} else {
bindVar, err = tp.bindFieldVal(field, &vals[i])
bindVar, err = tp.bindFieldVal(field, &afterVals[i])
}
if err != nil {
return nil, err
Expand All @@ -428,7 +434,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
}
switch {
case !before && after:
// only apply inserts for rows whose primary keys are within the range of rows already copied
// Only apply inserts for rows whose primary keys are within the range of rows already copied.
if tp.isOutsidePKRange(bindvars, before, after, "insert") {
return nil, nil
}
Expand Down Expand Up @@ -465,15 +471,67 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
return nil, err
}
}
// TODO: the INSERTs done here after deleting the row with the original PK
// need to use the values from the BEFORE image for the columns NOT present
// in the AFTER image due to being a partial image due to the source's usage
// of binlog-row-image=NOBLOB.
// For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we
// need to wrap the JSON diff function(s) around the BEFORE value.
if tp.isOutsidePKRange(bindvars, before, after, "insert") {
return nil, nil
}
if tp.isPartial(rowChange) {
// We need to use a combination of the values in the BEFORE and AFTER image to generate
// the new row.
jsonIndex := 0
for i, field := range tp.Fields {
if field.Type == querypb.Type_JSON && rowChange.JsonPartialValues != nil {
if isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) { // Otherwise we use the full AFTER value
if len(afterVals[i].Raw()) == 0 {
// When using BOTH binlog_row_image=NOBLOB AND
// binlog_row_value_options=PARTIAL_JSON then the JSON column has the data bit
// set and the diff is empty when it's not present. So we want to use the
// BEFORE image value.
beforeVal, err := vjson.MarshalSQLValue(bindvars["b_"+field.Name].Value)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to convert JSON to SQL field value for %s.%s when building insert query",
tp.TargetName, field.Name)
}
bindvars["a_"+field.Name], err = tp.bindFieldVal(field, beforeVal)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to bind field value for %s.%s when building insert query",
tp.TargetName, field.Name)
}
} else {
// For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we
// need to wrap the JSON diff function(s) around the BEFORE value.
diff := bindvars["a_"+field.Name].Value
beforeVal := bindvars["b_"+field.Name].Value
afterVal := bytes.Buffer{}
afterVal.Grow(len(diff) + len(beforeVal) + len(sqlparser.Utf8mb4Str) + 2) // +2 is for the enclosing quotes
// If the JSON column is partial, we need to specify the BEFORE value as
// the input for the diff function(s).
afterVal.WriteString(sqlparser.Utf8mb4Str)
afterVal.WriteByte('\'')
afterVal.Write(beforeVal)
afterVal.WriteByte('\'')
newVal := sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte(
fmt.Sprintf(afterVals[i].RawStr(), afterVal.String()),
))
bindVar, err := tp.bindFieldVal(field, &newVal)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to bind field value for %s.%s when building insert query",
tp.TargetName, field.Name)
}
bindvars["a_"+field.Name] = bindVar
}
}
jsonIndex++
} else {
if !isBitSet(rowChange.DataColumns.Cols, i) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"binary log event missing a needed value for %s.%s due to the usage of binlog-row-image=NOBLOB; you will need to re-run the workflow with binlog-row-image=FULL",
tp.TargetName, field.Name)
}
// Use the BEFORE image value for the new row.
bindvars["a_"+field.Name] = bindvars["b_"+field.Name]
}
}
}
return execParsedQuery(tp.Insert, bindvars, executor)
}
// Unreachable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,7 @@ func (tp *TablePlan) getPartialUpdateQuery(dataColumns *binlogdatapb.RowChange_B
tp.Stats.PartialQueryCacheSize.Add([]string{"update"}, 1)
return upd, nil
}

func (tp *TablePlan) getInsertForPartialRow(dataColumns *binlogdatapb.RowChange_Bitmap, jsonPartialValues *binlogdatapb.RowChange_Bitmap) (*sqlparser.ParsedQuery, error) {
return nil, nil
}
61 changes: 61 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,67 @@ func TestPlayerRowMove(t *testing.T) {
validateQueryCountStat(t, "replicate", 3)
}

/* TODO: build this out and get it working
func TestPlayerUpdatePK(t *testing.T) {
defer deleteTablet(addTablet(100))
execStatements(t, []string{
"create table src(id int, bd blob, jd json, primary key(id))",
fmt.Sprintf("create table %s.dst(id int, bd blob, jd json, primary key(id))", vrepldb),
})
defer execStatements(t, []string{
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst",
Filter: "select * from src",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}
cancel, _ := startVReplication(t, bls, "")
defer cancel()
execStatements(t, []string{
"insert into src values(1, 'blob data', _utf8mb4'{\"key1\":\"val1\"}'), (2, 'blob data2', _utf8mb4'{\"key2\":\"val2\"}'), (3, 'blob data3', _utf8mb4'{\"key3\":\"val3\"}')",
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(id,bd,jd) values (1,_binary'blob data','{\"key1\": \"val1\"}'), (2,_binary'blob data2','{\"key2\": \"val2\"}'), (3,_binary'blob data3','{\"key3\": \"val3\"}')",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 1)
execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
})
expectDBClientQueries(t, qh.Expect(
"begin",
"update dst set sval2=sval2-ifnull(3, 0), rcount=rcount-1 where val1=2",
"insert into dst(val1,sval2,rcount) values (1,ifnull(4, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 3)
}
*/

func TestPlayerTypes(t *testing.T) {
defer deleteTablet(addTablet(100))
execStatements(t, []string{
Expand Down

0 comments on commit 5b5c569

Please sign in to comment.