Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-21.0] SwitchTraffic: use separate context while canceling a migration (#17340) #17366

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,30 +1126,45 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
return nil
}

// cancelMigration attempts to revert all changes made during the migration so that we can get back to the
// state when traffic switching (or reversing) was initiated.
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
var err error

if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
}

// We create a new context while canceling the migration, so that we are independent of the original
// context being cancelled prior to or during the cancel operation.
cmTimeout := 60 * time.Second
cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(ctx)
err = ts.switchDeniedTables(cmCtx)
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
ts.Logger().Errorf("Cancel migration failed: %v", err)
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

sm.CancelStreamMigrations(ctx)
sm.CancelStreamMigrations(cmCtx)

err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
_, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query)
return err
})
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

err = ts.deleteReverseVReplication(ctx)
err = ts.deleteReverseVReplication(cmCtx)
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}
Expand Down
Loading