From 79bcf3113d2394ed0495c9f94e20d9b7fa6b962c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 7 Nov 2024 12:05:13 -0500 Subject: [PATCH 1/4] Qualify the backing sequence table name if we've been given a global-keyspace Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 3f4115579eb..ae9cdbe9831 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -396,7 +396,11 @@ func (mz *materializer) deploySchema() error { table := targetVSchema.Tables[ts.TargetTable] // Don't override or redo anything that already exists. if table != nil && table.AutoIncrement == nil { - seqTableName := fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable) + seqTableName, _ := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable)) + if mz.ms.WorkflowOptions.GlobalKeyspace != "" { + seqTableKeyspace, _ := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace) + seqTableName = fmt.Sprintf("%s.%s", seqTableKeyspace, seqTableName) + } // Create a Vitess AutoIncrement definition -- which uses a sequence -- to // replace the MySQL auto_increment definition that we removed. table.AutoIncrement = &vschemapb.AutoIncrement{ From 7e97530c7345b2853585bdee4254e9a6511facab Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 7 Nov 2024 12:13:23 -0500 Subject: [PATCH 2/4] Update unit test Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index fa3f25ca917..b46cee03cd8 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -863,7 +863,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added Column: "id", - Sequence: fmt.Sprintf(autoSequenceTableFormat, tableName), + Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, tableName)), }, }, }, @@ -931,7 +931,7 @@ func TestShardedAutoIncHandling(t *testing.T) { if tc.wantTargetVSchema != nil { targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) require.NoError(t, err) - require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema)) + require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema), "got: %v, want: %v", targetVSchema, tc.wantTargetVSchema) } } }) From 4ceed1c43b145fd40a29301daf802532e28966d8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 7 Nov 2024 13:12:14 -0500 Subject: [PATCH 3/4] Improvements Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 24 ++++++++++++++----- go/vt/vtctl/workflow/materializer_env_test.go | 2 +- go/vt/vtctl/workflow/materializer_test.go | 8 +++---- go/vt/vtctl/workflow/utils.go | 12 +++++++--- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index ae9cdbe9831..15f42c951b8 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -386,9 +386,9 @@ func (mz *materializer) deploySchema() error { } if removeAutoInc { - var replaceFunc func(columnName string) + var replaceFunc func(columnName string) error if mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling == vtctldatapb.ShardedAutoIncrementHandling_REPLACE { - replaceFunc = func(columnName string) { + replaceFunc = func(columnName string) error { mu.Lock() defer mu.Unlock() // At this point we've already confirmed that the table exists in the target @@ -396,10 +396,21 @@ func (mz *materializer) deploySchema() error { table := targetVSchema.Tables[ts.TargetTable] // Don't override or redo anything that already exists. if table != nil && table.AutoIncrement == nil { - seqTableName, _ := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable)) - if mz.ms.WorkflowOptions.GlobalKeyspace != "" { - seqTableKeyspace, _ := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace) - seqTableName = fmt.Sprintf("%s.%s", seqTableKeyspace, seqTableName) + tableName, err := sqlescape.UnescapeID(ts.TargetTable) + if err != nil { + return err + } + seqTableName := fmt.Sprintf(autoSequenceTableFormat, tableName) + seqTableName, err = sqlescape.EnsureEscaped(seqTableName) + if err != nil { + return err + } + if mz.ms.GetWorkflowOptions().GlobalKeyspace != "" { + seqKeyspace, err := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace) + if err != nil { + return err + } + seqTableName = fmt.Sprintf("%s.%s", seqKeyspace, seqTableName) } // Create a Vitess AutoIncrement definition -- which uses a sequence -- to // replace the MySQL auto_increment definition that we removed. @@ -409,6 +420,7 @@ func (mz *materializer) deploySchema() error { } updatedVSchema = true } + return nil } } ddl, err = stripAutoIncrement(ddl, mz.env.Parser(), replaceFunc) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 20374f7ef46..fb5064137cd 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -121,7 +121,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M tableName := ts.TargetTable table, err := venv.Parser().TableFromStatement(ts.SourceExpression) if err == nil { - tableName = table.Name.String() + tableName = sqlparser.String(table.Name) } var ( cols []string diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index b46cee03cd8..746c5fe2bae 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -613,12 +613,12 @@ func TestMoveTablesDDLFlag(t *testing.T) { // 2. REMOVE the tables' MySQL auto_increment clauses // 3. REPLACE the table's MySQL auto_increment clauses with Vitess sequences func TestShardedAutoIncHandling(t *testing.T) { - tableName := "t1" + tableName := "`t-1`" tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName) - validateEmptyTableQuery := fmt.Sprintf("select 1 from `%s` limit 1", tableName) + validateEmptyTableQuery := fmt.Sprintf("select 1 from %s limit 1", tableName) ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", - SourceKeyspace: "sourceks", + SourceKeyspace: "source-ks", TargetKeyspace: "targetks", TableSettings: []*vtctldatapb.TableMaterializeSettings{{ TargetTable: tableName, @@ -863,7 +863,7 @@ func TestShardedAutoIncHandling(t *testing.T) { }, AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added Column: "id", - Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, tableName)), + Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, strings.ReplaceAll(tableName, "`", ""))), }, }, }, diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 50f35667eaf..3c92c0948f4 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -222,24 +222,30 @@ func stripTableForeignKeys(ddl string, parser *sqlparser.Parser) (string, error) // table definition. If an optional replace function is specified then that // callback will be used to e.g. replace the MySQL clause with a Vitess // VSchema AutoIncrement definition. -func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string)) (string, error) { +func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string) error) (string, error) { newDDL, err := parser.ParseStrictDDL(ddl) if err != nil { return "", err } - _ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { switch node := node.(type) { case *sqlparser.ColumnDefinition: if node.Type.Options.Autoincrement { node.Type.Options.Autoincrement = false if replace != nil { - replace(sqlparser.String(node.Name)) + if err := replace(sqlparser.String(node.Name)); err != nil { + return false, vterrors.Wrapf(err, "failed to replace auto_increment column %s in %q", sqlparser.String(node.Name), ddl) + } + } } } return true, nil }, newDDL) + if err != nil { + return "", err + } return sqlparser.String(newDDL), nil } From 80745b4bf7a804a0d279ebbcaf3d42daafaa6574 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 7 Nov 2024 14:04:44 -0500 Subject: [PATCH 4/4] Minor tweak on self review Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 3 +-- go/vt/vtctl/workflow/utils.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 15f42c951b8..c65a00bf614 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -400,8 +400,7 @@ func (mz *materializer) deploySchema() error { if err != nil { return err } - seqTableName := fmt.Sprintf(autoSequenceTableFormat, tableName) - seqTableName, err = sqlescape.EnsureEscaped(seqTableName) + seqTableName, err := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, tableName)) if err != nil { return err } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 3c92c0948f4..65fa49fde86 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -235,7 +235,7 @@ func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(colum node.Type.Options.Autoincrement = false if replace != nil { if err := replace(sqlparser.String(node.Name)); err != nil { - return false, vterrors.Wrapf(err, "failed to replace auto_increment column %s in %q", sqlparser.String(node.Name), ddl) + return false, vterrors.Wrapf(err, "failed to replace auto_increment column %q in %q", sqlparser.String(node.Name), ddl) } }