Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 13, 2024
1 parent dd324ef commit fbd5648
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 49 deletions.
5 changes: 3 additions & 2 deletions go/mysql/binlog/binlog_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
diff.WriteByte('\'')

if opType == jsonDiffOpRemove { // No value for remove
diff.WriteString(")")
diff.WriteByte(')')
} else {
diff.WriteString(", ")
valueLen, readTo := readVariableLength(data, pos)
Expand All @@ -141,7 +141,8 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) {
if value.Type() == json.TypeString {
diff.WriteString(sqlparser.Utf8mb4Str)
}
diff.WriteString(fmt.Sprintf("%s)", value))
diff.Write(value.MarshalTo(nil))
diff.WriteByte(')')
}

outer = true
Expand Down
12 changes: 11 additions & 1 deletion go/test/utils/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const (
BinlogRowImageCnf = "binlog-row-image.cnf"
)

// SetBinlogRowImageMode creates a temp cnf file to set binlog_row_image to noblob for vreplication unit tests.
// SetBinlogRowImageMode creates a temp cnf file to set binlog_row_image=NOBLOB and
// binlog-row-value-options=PARTIAL_JSON for vreplication unit tests.
// It adds it to the EXTRA_MY_CNF environment variable which appends text from them into my.cnf.
func SetBinlogRowImageMode(mode string, cnfDir string) error {
var newCnfs []string
Expand Down Expand Up @@ -55,6 +56,15 @@ func SetBinlogRowImageMode(mode string, cnfDir string) error {
if err != nil {
return err
}
lm := strings.ToLower(mode)
if lm == "noblob" || lm == "minimal" {
// We're testing partial binlog row images so let's also test partial
// JSON values in the images.
_, err = f.WriteString("\nbinlog_row_value_options=PARTIAL_JSON\n")
if err != nil {
return err
}
}
err = f.Close()
if err != nil {
return err
Expand Down
21 changes: 19 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
}
if !isBitSet(rowChange.DataColumns.Cols, i) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"binary log event missing a needed value for %s.%s due to the usage of binlog-row-image=NOBLOB; you will need to re-run the workflow with binlog-row-image=FULL",
"binary log event missing a needed value for %s.%s due to not using binlog-row-image=FULL; you will need to re-run the workflow with binlog-row-image=FULL",
tp.TargetName, field.Name)
}
}
Expand Down Expand Up @@ -629,11 +629,28 @@ func (tp *TablePlan) applyBulkInsertChanges(rowInserts []*binlogdatapb.RowChange

newStmt := true
for _, rowInsert := range rowInserts {
var (
err error
bindVar *querypb.BindVariable
)
rowValues := &strings.Builder{}
bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields))
vals := sqltypes.MakeRowTrusted(tp.Fields, rowInsert.After)
for n, field := range tp.Fields {
bindVar, err := tp.bindFieldVal(field, &vals[n])
if field.Type == querypb.Type_JSON {
var jsVal *sqltypes.Value
if vals[n].IsNull() { // An SQL NULL and not an actual JSON value
jsVal = &sqltypes.NULL
} else { // A JSON value (which may be a JSON null literal value)
jsVal, err = vjson.MarshalSQLValue(vals[n].Raw())
if err != nil {
return nil, err
}
}
bindVar, err = tp.bindFieldVal(field, jsVal)
} else {
bindVar, err = tp.bindFieldVal(field, &vals[n])
}
if err != nil {
return nil, err
}
Expand Down
145 changes: 101 additions & 44 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,13 +1519,20 @@ func TestPlayerRowMove(t *testing.T) {
validateQueryCountStat(t, "replicate", 3)
}

/* TODO: build this out and get it working
func TestPlayerUpdatePK(t *testing.T) {
defer deleteTablet(addTablet(100))
// TestPlayerPartialImagesUpdatePK tests the behavior of the vplayer when we
// have partial binlog images, meaning that binlog-row-image=NOBLOB and
// binlog-row-value-options=PARTIAL_JSON. These are both set together when
// running the unit tests with runNoBlobTest=true. So we skip the test if
// it's not set.
func TestPlayerPartialImagesUpdatePK(t *testing.T) {
if !runNoBlobTest {
t.Skip("Skipping test as runNoBlobTest is not set")
}

defer deleteTablet(addTablet(100))
execStatements(t, []string{
"create table src(id int, bd blob, jd json, primary key(id))",
fmt.Sprintf("create table %s.dst(id int, bd blob, jd json, primary key(id))", vrepldb),
"create table src (id int, jd json, bd blob, primary key(id))",
fmt.Sprintf("create table %s.dst (id int, jd json, bd blob, primary key(id))", vrepldb),
})
defer execStatements(t, []string{
"drop table src",
Expand All @@ -1547,38 +1554,73 @@ func TestPlayerUpdatePK(t *testing.T) {
cancel, _ := startVReplication(t, bls, "")
defer cancel()

execStatements(t, []string{
"insert into src values(1, 'blob data', _utf8mb4'{\"key1\":\"val1\"}'), (2, 'blob data2', _utf8mb4'{\"key2\":\"val2\"}'), (3, 'blob data3', _utf8mb4'{\"key3\":\"val3\"}')",
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(id,bd,jd) values (1,_binary'blob data','{\"key1\": \"val1\"}'), (2,_binary'blob data2','{\"key2\": \"val2\"}'), (3,_binary'blob data3','{\"key3\": \"val3\"}')",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 1)
execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
})
expectDBClientQueries(t, qh.Expect(
"begin",
"update dst set sval2=sval2-ifnull(3, 0), rcount=rcount-1 where val1=2",
"insert into dst(val1,sval2,rcount) values (1,ifnull(4, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 3)
testCases := []struct {
input string
output []string
data [][]string
error string
}{
{
input: "insert into src (id, jd, bd) values (1,'{\"key1\": \"val1\"}','blob data'), (2,'{\"key2\": \"val2\"}','blob data2'), (3,'{\"key3\": \"val3\"}','blob data3')",
output: []string{"insert into dst(id,jd,bd) values (1,JSON_OBJECT(_utf8mb4'key1', _utf8mb4'val1'),_binary'blob data'), (2,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'blob data2'), (3,JSON_OBJECT(_utf8mb4'key3', _utf8mb4'val3'),_binary'blob data3')"},
data: [][]string{
{"1", "{\"key1\": \"val1\"}", "blob data"},
{"2", "{\"key2\": \"val2\"}", "blob data2"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
},
},
{
input: `update src set jd=JSON_SET(jd, '$.color', 'red') where id = 1`,
output: []string{"update dst set jd=JSON_INSERT(`jd`, _utf8mb4'$.color', _utf8mb4\"red\") where id=1"},
data: [][]string{
{"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"},
{"2", "{\"key2\": \"val2\"}", "blob data2"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
},
},
{
input: `update src set id = id+10, bd = 'new blob data' where id = 2`,
output: []string{
"delete from dst where id=2",
"insert into dst(id,jd,bd) values (12,JSON_OBJECT(_utf8mb4'key2', _utf8mb4'val2'),_binary'new blob data')",
},
data: [][]string{
{"1", "{\"key1\": \"val1\", \"color\": \"red\"}", "blob data"},
{"3", "{\"key3\": \"val3\"}", "blob data3"},
{"12", "{\"key2\": \"val2\"}", "new blob data"},
},
},
{
input: `update src set id = id+10 where id = 3`,
error: "binary log event missing a needed value for dst.bd due to not using binlog-row-image=FULL",
},
}

for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
execStatements(t, []string{tc.input})
var want qh.ExpectationSequencer
if tc.error != "" {
want = qh.Expect(
"rollback",
).Then(qh.Immediately(
fmt.Sprintf("/update _vt.vreplication set message=.*%s.*", tc.error),
))
expectDBClientQueries(t, want)
} else {
want = qh.Expect(
"begin",
tc.output...,
).Then(qh.Immediately(
"/update _vt.vreplication set pos=",
"commit",
))
expectDBClientQueries(t, want)
expectData(t, "dst", tc.data)
}
})
}
}
*/

func TestPlayerTypes(t *testing.T) {
defer deleteTablet(addTablet(100))
Expand Down Expand Up @@ -1714,15 +1756,30 @@ func TestPlayerTypes(t *testing.T) {
{"1", "", "{}", "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}, {
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1",
output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1",
table: "vitess_json",
data: [][]string{
{"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
}}
if !runNoBlobTest {
testcases = append(testcases, testcase{
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1",
output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val2=JSON_OBJECT(), val3=CAST(123 as JSON), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1",
table: "vitess_json",
data: [][]string{
{"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
})
} else {
// With partial JSON values we don't replicate the JSON columns that aren't
// actually updated.
testcases = append(testcases, testcase{
input: "update vitess_json set val1 = '{\"bar\": \"foo\"}', val4 = '{\"a\": [98, 123]}', val5 = convert(x'7b7d' using utf8mb4) where id=1",
output: "update vitess_json set val1=JSON_OBJECT(_utf8mb4'bar', _utf8mb4'foo'), val4=JSON_OBJECT(_utf8mb4'a', JSON_ARRAY(98, 123)), val5=JSON_OBJECT() where id=1",
table: "vitess_json",
data: [][]string{
{"1", `{"bar": "foo"}`, "{}", "123", `{"a": [98, 123]}`, `{}`},
{"2", "null", `{"name": null}`, "123", `{"a": [42, 100]}`, `{"foo": "bar"}`},
},
})
}

for _, tcases := range testcases {
execStatements(t, []string{tcases.input})
Expand Down

0 comments on commit fbd5648

Please sign in to comment.