Skip to content

Commit

Permalink
Ensure that ROW events are sent within a trx
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jul 19, 2023
1 parent f560d00 commit 561a949
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
8 changes: 7 additions & 1 deletion go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,13 @@ func TestVStreamCopyTransactions(t *testing.T) {
gconn, conn, _, closeConnections := initialize(ctx, t)
defer closeConnections()

// Generate some test data.
// Clear any existing data.
q := fmt.Sprintf("delete from %s", table)
_, err := conn.ExecuteFetch(q, -1, false)
require.NoError(t, err, "error clearing data: %v", err)

// Generate some test data. Enough to cross the vstream_packet_size
// threshold.
for i := 1; i <= 100000; i++ {
values := fmt.Sprintf("(%d, %d)", i, i)
q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values)
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendFieldEvent returned error %v", err)
return err
}
// sendFieldEvent sends a BEGIN event first.
uvs.inTransaction = true
}

if len(rows.Rows) == 0 {
log.V(2).Infof("0 rows returned for table %s", tableName)
return nil
}

// We are about to send ROW events, so we need to ensure
// that we do so within a transaction. The COMMIT event
// will be sent in sendEventsForRows() below.
if !uvs.inTransaction {
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_BEGIN,
}}
uvs.send(evs)
uvs.inTransaction = true
}

newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{
Fields: uvs.pkfields,
Rows: []*querypb.Row{rows.Lastpk},
Expand All @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendEventsForRows returned error %v", err)
return err
}
// sendEventsForRows sends a COMMIT event last.
uvs.inTransaction = false

uvs.setCopyState(tableName, qrLastPK)
log.V(2).Infof("NewLastPK: %v", qrLastPK)
Expand Down
20 changes: 12 additions & 8 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
throttlerApp throttlerapp.Name
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
// Are we currently in an explicit transaction?
// If we are not, and we're about to send ROW
// events, then we need to send a BEGIN event first.
inTransaction bool
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
throttlerApp throttlerapp.Name

vschema *localVSchema

Expand Down

0 comments on commit 561a949

Please sign in to comment.