-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
copy over existing vreplication rows copied to local counter if resuming from another tablet #13949
Changes from 1 commit
8c3a59e
25d67cb
3fa1d4f
51b91b6
9151498
bc2905d
b0706f6
2f39e45
ff34271
f53d052
96c1647
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -142,7 +142,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer | |
log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d", | ||
vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval) | ||
} | ||
return &vreplicator{ | ||
v := &vreplicator{ | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
vre: vre, | ||
id: id, | ||
source: source, | ||
|
@@ -151,6 +151,8 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer | |
dbClient: newVDBClient(dbClient, stats), | ||
mysqld: mysqld, | ||
} | ||
v.readExistingRowsCopiedAndUpdateSelfIfNeeded() | ||
return v | ||
} | ||
|
||
// Replicate starts a vreplication stream. It can be in one of three phases: | ||
|
@@ -1030,3 +1032,36 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err | |
} | ||
return dbClient, nil | ||
} | ||
|
||
func (vr *vreplicator) readExistingRowsCopiedAndUpdateSelfIfNeeded() { | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if vr.stats.CopyRowCount.Get() == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A large table can take 5 days (as an example). Let's say the shard has 1 primary and 1 replica tablet at any time. The copy starts on the original replica, gets resumed on the original primary, then resumed again on the original replica. Would this value be non-zero but still incorrect? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thankfully no, it seems that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, great! |
||
rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id) | ||
if err != nil { | ||
log.Warningf("failed to read existing rows copied: %v", err) | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else if rowsCopiedExisting != 0 { | ||
log.Infof("resuming vreplication from another tablet, setting rows copied counter to %v", rowsCopiedExisting) | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
vr.stats.CopyRowCount.Set(rowsCopiedExisting) | ||
} | ||
} | ||
} | ||
|
||
func (vr *vreplicator) readExistingRowsCopied(uid int32) (int64, error) { | ||
query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`, | ||
sqltypes.Int32BindVariable(uid), | ||
) | ||
if err != nil { | ||
return 0, err | ||
} | ||
r, err := vr.dbClient.Execute(query) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if len(r.Rows) == 0 { | ||
return 0, nil | ||
} | ||
row := r.Named().Row() | ||
if row == nil { | ||
return 0, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Cannot find unique workflow for UUID: %+v", uid) | ||
} | ||
return row.AsInt64("rows_copied", 0), nil | ||
} | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -628,6 +628,53 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) { | |
require.Equal(t, 1, len(res.Rows)) | ||
} | ||
|
||
// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you | ||
// resume a workflow started by another tablet (eg. a reparent occurred), the | ||
// rows_copied does not reset to zero but continues along from where it left off | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another annoying nit about using sentence punctuation. 🙂 |
||
func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) { | ||
_, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
tablet := addTablet(100) | ||
defer deleteTablet(tablet) | ||
filter := &binlogdatapb.Filter{ | ||
Rules: []*binlogdatapb.Rule{{ | ||
Match: "t1", | ||
}}, | ||
} | ||
bls := &binlogdatapb.BinlogSource{ | ||
Keyspace: env.KeyspaceName, | ||
Shard: env.ShardName, | ||
Filter: filter, | ||
} | ||
// The test env uses the same factory for both dba and | ||
// filtered connections. | ||
dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba" | ||
id := int32(1) | ||
vsclient := newTabletConnector(tablet) | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
stats := binlogplayer.NewStats() | ||
defer stats.Stop() | ||
dbaconn := playerEngine.dbClientFactoryDba() | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err := dbaconn.Connect() | ||
require.NoError(t, err) | ||
defer dbaconn.Close() | ||
dbClient := playerEngine.dbClientFactoryFiltered() | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err = dbClient.Connect() | ||
require.NoError(t, err) | ||
defer dbClient.Close() | ||
dbName := dbClient.DBName() | ||
olyazavr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
rowsCopied := int64(500000) | ||
// Ensure there's an existing vreplication workflow | ||
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v", | ||
id, dbName, rowsCopied, dbName, rowsCopied), 1) | ||
require.NoError(t, err) | ||
defer func() { | ||
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) | ||
require.NoError(t, err) | ||
}() | ||
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) | ||
assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get()) | ||
} | ||
|
||
// stripCruft removes all whitespace unicode chars and backticks. | ||
func stripCruft(in string) string { | ||
out := strings.Builder{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the unit tests rather than ignore this functionality altering query.
I know that it's a pain because there are so many of them. I can help with that if you want, just let me know. It might be tricky since the query probably won't be executed in most tests (because there was no reparent).