diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index f2ba9af992b..0b68be05b48 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -480,6 +480,12 @@ func TestVStreamCopyResume(t *testing.T) { } printEvents(evs) // for debugging ci failures } + if ev.Type == binlogdatapb.VEventType_VGTID { + // Validate that the vgtid event the client receives from the vstream copy has a complete TableLastPK proto message. + // Also, to ensure that the client can resume properly, make sure that + // the Fields value is present in the sqltypes.Result field and not missing. + require.Regexp(t, `type:VGTID vgtid:{shard_gtids:{keyspace:"ks" shard:"-80" gtid:".+" table_p_ks:{table_name:"t1_copy_resume" lastpk:{fields:{name:"id1" type:INT64} rows:{lengths:1 values:"[0-9]"}}}} shard_gtids:{keyspace:"ks" shard:"80-" gtid:".+" table_p_ks:{table_name:"t1_copy_resume" lastpk:{fields:{name:"id1" type:INT64} rows:{lengths:1 values:"[0-9]"}}}}} keyspace:"ks" shard:"(80-|-80)"`, ev.String()) + } } if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { sort.Sort(VEventSorter(evs)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 0065555047d..62b93cf5063 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -262,7 +262,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { } newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ - Fields: rows.Fields, + Fields: uvs.pkfields, Rows: []*querypb.Row{rows.Lastpk}, }) qrLastPK := sqltypes.ResultToProto3(newLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 610b9012f7f..8ca43f008b6 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -477,7 +477,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:2 values:\"880\"}}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:2 values:\"990\"}}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"10100\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:2 values:\"10\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id11\" type:INT32} rows:{lengths:2 values:\"10\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", @@ -506,7 +506,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:1 lengths:3 values:\"9180\"}}}", "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"10200\"}}}", "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\" lastpk:{rows:{lengths:2 values:\"11\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\" lastpk:{fields:{name:\"id21\" type:INT32} rows:{lengths:2 values:\"11\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\"} completed:true}", @@ -534,7 +534,7 @@ var expectedEvents = []string{ "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:1 lengths:3 values:\"8240\"}}}", "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:1 lengths:3 values:\"9270\"}}}", "type:ROW row_event:{table_name:\"t3\" row_changes:{after:{lengths:2 lengths:3 values:\"10300\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\" lastpk:{rows:{lengths:2 values:\"10\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\" lastpk:{fields:{name:\"id31\" type:INT32} rows:{lengths:2 values:\"10\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index c2e6f8cef55..37a62b9d7aa 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -506,7 +506,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", "type:GTID", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id1\" type:INT32} rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", @@ -514,7 +514,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:BEGIN", "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{rows:{lengths:1 values:\"1\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{fields:{name:\"id1\" type:INT32} rows:{lengths:1 values:\"1\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", @@ -523,7 +523,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { "type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:45} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{rows:{lengths:1 values:\"b\"}}}}", + "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR} rows:{lengths:1 values:\"b\"}}}}", "type:COMMIT", "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\"} completed:true}",