Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Qualify and SQL escape tables in created AutoIncrement VSchema definitions #17174

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,31 @@ func (mz *materializer) deploySchema() error {
}

if removeAutoInc {
var replaceFunc func(columnName string)
var replaceFunc func(columnName string) error
if mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling == vtctldatapb.ShardedAutoIncrementHandling_REPLACE {
replaceFunc = func(columnName string) {
replaceFunc = func(columnName string) error {
mu.Lock()
defer mu.Unlock()
// At this point we've already confirmed that the table exists in the target
// vschema.
table := targetVSchema.Tables[ts.TargetTable]
// Don't override or redo anything that already exists.
if table != nil && table.AutoIncrement == nil {
seqTableName := fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable)
tableName, err := sqlescape.UnescapeID(ts.TargetTable)
if err != nil {
return err
}
seqTableName, err := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, tableName))
if err != nil {
return err
}
if mz.ms.GetWorkflowOptions().GlobalKeyspace != "" {
seqKeyspace, err := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace)
if err != nil {
return err
}
seqTableName = fmt.Sprintf("%s.%s", seqKeyspace, seqTableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

}
// Create a Vitess AutoIncrement definition -- which uses a sequence -- to
// replace the MySQL auto_increment definition that we removed.
table.AutoIncrement = &vschemapb.AutoIncrement{
Expand All @@ -405,6 +419,7 @@ func (mz *materializer) deploySchema() error {
}
updatedVSchema = true
}
return nil
}
}
ddl, err = stripAutoIncrement(ddl, mz.env.Parser(), replaceFunc)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
tableName := ts.TargetTable
table, err := venv.Parser().TableFromStatement(ts.SourceExpression)
if err == nil {
tableName = table.Name.String()
tableName = sqlparser.String(table.Name)
}
var (
cols []string
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,12 @@ func TestMoveTablesDDLFlag(t *testing.T) {
// 2. REMOVE the tables' MySQL auto_increment clauses
// 3. REPLACE the table's MySQL auto_increment clauses with Vitess sequences
func TestShardedAutoIncHandling(t *testing.T) {
tableName := "t1"
tableName := "`t-1`"
tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName)
validateEmptyTableQuery := fmt.Sprintf("select 1 from `%s` limit 1", tableName)
validateEmptyTableQuery := fmt.Sprintf("select 1 from %s limit 1", tableName)
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
SourceKeyspace: "source-ks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: tableName,
Expand Down Expand Up @@ -863,7 +863,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added
Column: "id",
Sequence: fmt.Sprintf(autoSequenceTableFormat, tableName),
Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, strings.ReplaceAll(tableName, "`", ""))),
},
},
},
Expand Down Expand Up @@ -931,7 +931,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
if tc.wantTargetVSchema != nil {
targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema))
require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema), "got: %v, want: %v", targetVSchema, tc.wantTargetVSchema)
}
}
})
Expand Down
12 changes: 9 additions & 3 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,30 @@ func stripTableForeignKeys(ddl string, parser *sqlparser.Parser) (string, error)
// table definition. If an optional replace function is specified then that
// callback will be used to e.g. replace the MySQL clause with a Vitess
// VSchema AutoIncrement definition.
func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string)) (string, error) {
func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string) error) (string, error) {
newDDL, err := parser.ParseStrictDDL(ddl)
if err != nil {
return "", err
}

_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ColumnDefinition:
if node.Type.Options.Autoincrement {
node.Type.Options.Autoincrement = false
if replace != nil {
replace(sqlparser.String(node.Name))
if err := replace(sqlparser.String(node.Name)); err != nil {
return false, vterrors.Wrapf(err, "failed to replace auto_increment column %q in %q", sqlparser.String(node.Name), ddl)
}

}
}
}
return true, nil
}, newDDL)
if err != nil {
return "", err
}

return sqlparser.String(newDDL), nil
}
Expand Down
Loading