Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: support @@migration_context in vtgate session. Use if non-empty #13675

Merged
merged 5 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 198 additions & 186 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
onlineDdlUUIDRegexp = regexp.MustCompile(`^[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}$`)
onlineDDLGeneratedTableNameRegexp = regexp.MustCompile(`^_[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}_([0-9]{14})_(gho|ghc|del|new|vrepl)$`)
ptOSCGeneratedTableNameRegexp = regexp.MustCompile(`^_.*_old$`)
migrationContextValidatorRegexp = regexp.MustCompile(`^[\w:-]*$`)
)

var (
Expand All @@ -52,6 +53,14 @@ const (
RevertActionStr = "revert"
)

// ValidateMigrationContext validates that the given migration context only uses valid characters
func ValidateMigrationContext(migrationContext string) error {
if migrationContextValidatorRegexp.MatchString(migrationContext) {
return nil
}
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid characters in migration_context %v. Use alphanumeric, dash, underscore and colon only", migrationContext)
}

// when validateWalk returns true, then the child nodes are also visited
func validateWalk(node sqlparser.SQLNode, allowForeignKeys bool) (kontinue bool, err error) {
switch node.(type) {
Expand Down
27 changes: 27 additions & 0 deletions go/vt/schema/online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,30 @@ func TestOnlineDDLFromCommentedStatement(t *testing.T) {
})
}
}

func TestValidateMigrationContext(t *testing.T) {
tcases := []struct {
m string
expectError bool
}{
{"", false},
{"abc", false},
{"abc-def", false},
{"abc-DEF", false},
{"abc-def-123", false},
{"under_score:abc-DEF-123", false},
{"~", true},
{",", true},
{"abc^def", true},
}
for _, tcase := range tcases {
t.Run(tcase.m, func(t *testing.T) {
err := ValidateMigrationContext(tcase.m)
if tcase.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
1 change: 1 addition & 0 deletions go/vt/sqlparser/ast_rewriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (er *astRewriter) sysVarRewrite(cursor *Cursor, node *Variable) {
sysvars.Charset.Name,
sysvars.ClientFoundRows.Name,
sysvars.DDLStrategy.Name,
sysvars.MigrationContext.Name,
sysvars.Names.Name,
sysvars.TransactionMode.Name,
sysvars.ReadAfterWriteGTID.Name,
Expand Down
21 changes: 14 additions & 7 deletions go/vt/sqlparser/ast_rewriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type testCaseSysVar struct {
}

type myTestCase struct {
in, expected string
liid, db, foundRows, rowCount, rawGTID, rawTimeout, sessTrackGTID bool
ddlStrategy, sessionUUID, sessionEnableSystemSettings bool
udv int
autocommit, clientFoundRows, skipQueryPlanCache, socket, queryTimeout bool
sqlSelectLimit, transactionMode, workload, version, versionComment bool
txIsolation bool
in, expected string
liid, db, foundRows, rowCount, rawGTID, rawTimeout, sessTrackGTID bool
ddlStrategy, migrationContext, sessionUUID, sessionEnableSystemSettings bool
udv int
autocommit, clientFoundRows, skipQueryPlanCache, socket, queryTimeout bool
sqlSelectLimit, transactionMode, workload, version, versionComment bool
txIsolation bool
}

func TestRewrites(in *testing.T) {
Expand Down Expand Up @@ -189,6 +189,10 @@ func TestRewrites(in *testing.T) {
in: `select * from user where col = @@ddl_strategy`,
expected: "select * from user where col = :__vtddl_strategy",
ddlStrategy: true,
}, {
in: `select * from user where col = @@migration_context`,
expected: "select * from user where col = :__vtmigration_context",
migrationContext: true,
}, {
in: `select * from user where col = @@read_after_write_gtid OR col = @@read_after_write_timeout OR col = @@session_track_gtids`,
expected: "select * from user where col = :__vtread_after_write_gtid or col = :__vtread_after_write_timeout or col = :__vtsession_track_gtids",
Expand Down Expand Up @@ -304,6 +308,7 @@ func TestRewrites(in *testing.T) {
version: true,
versionComment: true,
ddlStrategy: true,
migrationContext: true,
sessionUUID: true,
sessionEnableSystemSettings: true,
rawGTID: true,
Expand All @@ -323,6 +328,7 @@ func TestRewrites(in *testing.T) {
version: true,
versionComment: true,
ddlStrategy: true,
migrationContext: true,
sessionUUID: true,
sessionEnableSystemSettings: true,
rawGTID: true,
Expand Down Expand Up @@ -367,6 +373,7 @@ func TestRewrites(in *testing.T) {
assert.Equal(tc.workload, result.NeedsSysVar(sysvars.Workload.Name), "should need :__vtworkload")
assert.Equal(tc.queryTimeout, result.NeedsSysVar(sysvars.QueryTimeout.Name), "should need :__vtquery_timeout")
assert.Equal(tc.ddlStrategy, result.NeedsSysVar(sysvars.DDLStrategy.Name), "should need ddlStrategy")
assert.Equal(tc.migrationContext, result.NeedsSysVar(sysvars.MigrationContext.Name), "should need migrationContext")
assert.Equal(tc.sessionUUID, result.NeedsSysVar(sysvars.SessionUUID.Name), "should need sessionUUID")
assert.Equal(tc.sessionEnableSystemSettings, result.NeedsSysVar(sysvars.SessionEnableSystemSettings.Name), "should need sessionEnableSystemSettings")
assert.Equal(tc.rawGTID, result.NeedsSysVar(sysvars.ReadAfterWriteGTID.Name), "should need rawGTID")
Expand Down
6 changes: 5 additions & 1 deletion go/vt/sysvars/sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ var (
QueryTimeout = SystemVariable{Name: "query_timeout"}

// Online DDL
DDLStrategy = SystemVariable{Name: "ddl_strategy", IdentifierAsString: true}
DDLStrategy = SystemVariable{Name: "ddl_strategy", IdentifierAsString: true}
MigrationContext = SystemVariable{Name: "migration_context", IdentifierAsString: true}

// Version
Version = SystemVariable{Name: "version"}
VersionComment = SystemVariable{Name: "version_comment"}

Expand All @@ -95,6 +98,7 @@ var (
Charset,
Names,
SessionUUID,
MigrationContext,
SessionEnableSystemSettings,
ReadAfterWriteGTID,
ReadAfterWriteTimeOut,
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ func (t *noopVCursor) GetDDLStrategy() string {
panic("implement me")
}

func (t *noopVCursor) SetMigrationContext(migrationContext string) {
panic("implement me")
}

func (t *noopVCursor) GetMigrationContext() string {
panic("implement me")
}

func (t *noopVCursor) GetSessionUUID() string {
panic("implement me")
}
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ func (v *OnlineDDL) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
},
Rows: [][]sqltypes.Value{},
}
migrationContext := vcursor.Session().GetMigrationContext()
if migrationContext == "" {
// default to @@session_uuid
migrationContext = fmt.Sprintf("vtgate:%s", vcursor.Session().GetSessionUUID())
}
onlineDDLs, err := schema.NewOnlineDDLs(v.GetKeyspaceName(), v.SQL, v.DDL,
v.DDLStrategySetting, fmt.Sprintf("vtgate:%s", vcursor.Session().GetSessionUUID()), "",
v.DDLStrategySetting, migrationContext, "",
)
if err != nil {
return result, err
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ type (

SetDDLStrategy(string)
GetDDLStrategy() string
SetMigrationContext(string)
GetMigrationContext() string

GetSessionUUID() string

Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,15 @@ func (svss *SysVarSetAware) Execute(ctx context.Context, vcursor VCursor, env *e
return vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.WrongValueForVar, "invalid DDL strategy: %s", str)
}
vcursor.Session().SetDDLStrategy(str)
case sysvars.MigrationContext.Name:
str, err := svss.evalAsString(env, vcursor)
if err != nil {
return err
}
if err := schema.ValidateMigrationContext(str); err != nil {
return vterrors.NewErrorf(vtrpcpb.Code_INVALID_ARGUMENT, vterrors.WrongValueForVar, "invalid migration_context: %s", str)
}
vcursor.Session().SetMigrationContext(str)
case sysvars.QueryTimeout.Name:
queryTimeout, err := svss.evalAsInt64(env, vcursor)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ func (e *Executor) addNeededBindVars(vcursor *vcursorImpl, bindVarNeeds *sqlpars
bindVars[key] = sqltypes.StringBindVariable(v)
case sysvars.DDLStrategy.Name:
bindVars[key] = sqltypes.StringBindVariable(session.DDLStrategy)
case sysvars.MigrationContext.Name:
bindVars[key] = sqltypes.StringBindVariable(session.MigrationContext)
case sysvars.SessionUUID.Name:
bindVars[key] = sqltypes.StringBindVariable(session.SessionUUID)
case sysvars.SessionEnableSystemSettings.Name:
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func TestSelectSystemVariables(t *testing.T) {

sql := "select @@autocommit, @@client_found_rows, @@skip_query_plan_cache, @@enable_system_settings, " +
"@@sql_select_limit, @@transaction_mode, @@workload, @@read_after_write_gtid, " +
"@@read_after_write_timeout, @@session_track_gtids, @@ddl_strategy, @@socket, @@query_timeout"
"@@read_after_write_timeout, @@session_track_gtids, @@ddl_strategy, @@migration_context, @@socket, @@query_timeout"

result, err := executorExec(executor, sql, map[string]*querypb.BindVariable{})
wantResult := &sqltypes.Result{
Expand All @@ -776,6 +776,7 @@ func TestSelectSystemVariables(t *testing.T) {
{Name: "@@read_after_write_timeout", Type: sqltypes.Float64, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG | querypb.MySqlFlag_NUM_FLAG)},
{Name: "@@session_track_gtids", Type: sqltypes.VarChar, Charset: uint32(collations.Default()), Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG)},
{Name: "@@ddl_strategy", Type: sqltypes.VarChar, Charset: uint32(collations.Default()), Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG)},
{Name: "@@migration_context", Type: sqltypes.VarChar, Charset: uint32(collations.Default()), Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG)},
{Name: "@@socket", Type: sqltypes.VarChar, Charset: uint32(collations.Default()), Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG)},
{Name: "@@query_timeout", Type: sqltypes.Int64, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG | querypb.MySqlFlag_NUM_FLAG)},
},
Expand All @@ -794,6 +795,7 @@ func TestSelectSystemVariables(t *testing.T) {
sqltypes.NewVarChar("own_gtid"),
sqltypes.NewVarChar(""),
sqltypes.NewVarChar(""),
sqltypes.NewVarChar(""),
sqltypes.NewInt64(0),
}},
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (vh *vtgateHandler) session(c *mysql.Conn) *vtgatepb.Session {
},
Autocommit: true,
DDLStrategy: defaultDDLStrategy,
MigrationContext: "",
SessionUUID: u.String(),
EnableSystemSettings: sysVarSetEnabled,
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,20 @@ func (session *SafeSession) GetDDLStrategy() string {
return session.DDLStrategy
}

// SetMigrationContext set the migration_context setting.
func (session *SafeSession) SetMigrationContext(migrationContext string) {
session.mu.Lock()
defer session.mu.Unlock()
session.MigrationContext = migrationContext
}

// GetMigrationContext returns the migration_context value.
func (session *SafeSession) GetMigrationContext() string {
session.mu.Lock()
defer session.mu.Unlock()
return session.MigrationContext
}

// GetSessionUUID returns the SessionUUID value.
func (session *SafeSession) GetSessionUUID() string {
session.mu.Lock()
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,16 @@ func (vc *vcursorImpl) GetDDLStrategy() string {
return vc.safeSession.GetDDLStrategy()
}

// SetMigrationContext implements the SessionActions interface
func (vc *vcursorImpl) SetMigrationContext(migrationContext string) {
vc.safeSession.SetMigrationContext(migrationContext)
}

// GetMigrationContext implements the SessionActions interface
func (vc *vcursorImpl) GetMigrationContext() string {
return vc.safeSession.GetMigrationContext()
}

// GetSessionUUID implements the SessionActions interface
func (vc *vcursorImpl) GetSessionUUID() string {
return vc.safeSession.GetSessionUUID()
Expand Down
3 changes: 3 additions & 0 deletions proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ message Session {
int64 query_timeout = 25;

map<string, PrepareData> prepare_statement = 26;

// MigrationContext
string migration_context = 27;
}

// PrepareData keeps the prepared statement and other information related for execution of it.
Expand Down