Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-16.0] VReplication: Ensure ROW events are sent within a transaction (#13547) #13580

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sort"
"sync"
"testing"
Expand Down Expand Up @@ -520,6 +521,136 @@ func TestVStreamSharded(t *testing.T) {

}

// TestVStreamCopyTransactions tests that we are properly wrapping
// ROW events in the stream with BEGIN and COMMIT events.
func TestVStreamCopyTransactions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keyspace := "ks"
shards := []string{"-80", "80-"}
table := "t1_copy_basic"
beginEventSeen, commitEventSeen := false, false
numResultInTrx := 0
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: keyspace,
Shard: shards[0],
Gtid: "", // Start a vstream copy
},
{
Keyspace: keyspace,
Shard: shards[1],
Gtid: "", // Start a vstream copy
},
},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}},
}

gconn, conn, _, closeConnections := initialize(ctx, t)
defer closeConnections()

// Clear any existing data.
q := fmt.Sprintf("delete from %s", table)
_, err := conn.ExecuteFetch(q, -1, false)
require.NoError(t, err, "error clearing data: %v", err)

// Generate some test data. Enough to cross the default
// vstream_packet_size threshold.
for i := 1; i <= 100000; i++ {
values := fmt.Sprintf("(%d, %d)", i, i)
q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values)
_, err := conn.ExecuteFetch(q, 1, false)
require.NoError(t, err, "error inserting data: %v", err)
}

// Start a vstream.
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil)
require.NoError(t, err, "error starting vstream: %v", err)

recvLoop:
for {
vevents, err := reader.Recv()
numResultInTrx++
eventCount := len(vevents)
t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n",
eventCount, numResultInTrx)
switch err {
case nil:
for _, event := range vevents {
switch event.Type {
case binlogdatapb.VEventType_BEGIN:
require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n",
numResultInTrx)
beginEventSeen = true
t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx)
require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_VGTID:
t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_FIELD:
t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_ROW:
// Uncomment if you need to do more debugging.
// t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
// beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_COMMIT:
commitEventSeen = true
t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_COPY_COMPLETED:
t.Logf("Finished vstream copy\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
break recvLoop
default:
t.Logf("Found extraneous event: %+v\n", event)
}
if beginEventSeen && commitEventSeen {
t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n")
beginEventSeen = false
commitEventSeen = false
numResultInTrx = 0
}
}
case io.EOF:
t.Logf("vstream ended\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
return
default:
require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err)
return
}
}
// The last response, when the vstream copy completes, does not
// typically contain ROW events.
if beginEventSeen || commitEventSeen {
require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set")
}
}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendFieldEvent returned error %v", err)
return err
}
// sendFieldEvent() sends a BEGIN event first.
uvs.inTransaction = true
}

if len(rows.Rows) == 0 {
log.V(2).Infof("0 rows returned for table %s", tableName)
return nil
}

// We are about to send ROW events, so we need to ensure
// that we do so within a transaction. The COMMIT event
// will be sent in sendEventsForRows() below.
if !uvs.inTransaction {
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_BEGIN,
}}
uvs.send(evs)
uvs.inTransaction = true
}

newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{
Fields: rows.Fields,
Rows: []*querypb.Row{rows.Lastpk},
Expand All @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendEventsForRows returned error %v", err)
return err
}
// sendEventsForRows() sends a COMMIT event last.
uvs.inTransaction = false

uvs.setCopyState(tableName, qrLastPK)
log.V(2).Infof("NewLastPK: %v", qrLastPK)
Expand Down
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
// Are we currently in an explicit transaction?
// If we are not, and we're about to send ROW
// events, then we need to send a BEGIN event first.
inTransaction bool
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK

vschema *localVSchema

Expand Down