Skip to content

Commit

Permalink
Turns out that WorkflowDelete isn't the right fit
Browse files Browse the repository at this point in the history
That assumes that all artifacts related to a successfully
created workflow exist. In our case they do not (e.g. the
target tables).

So we add code to rollback any target vschema changes if
we made any.

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 20, 2023
1 parent 24d8332 commit 80f8a03
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/sqlerror"

Expand Down Expand Up @@ -977,6 +978,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}

var vschema *vschemapb.Keyspace
var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create
vschema, err = s.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1019,6 +1021,9 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace)
if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1079,6 +1084,12 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
return
}
if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil {
err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr)
}
}
}()

Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
&sqltypes.Result{RowsAffected: 1},
)

// Save the current target vschema.
vs, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")

_, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: wf,
SourceKeyspace: sourceKs,
Expand All @@ -741,4 +745,9 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
rules, err := topotools.GetRoutingRules(ctx, tenv.ts)
require.NoError(t, err, "failed to get routing rules")
require.Equal(t, 0, len(rules), "expected no routing rules to be present")

// Check that our vschema changes were also rolled back.
vs2, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")
require.Equal(t, vs, vs2, "expected vschema to be unchanged")
}
10 changes: 10 additions & 0 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
}

var vschema *vschemapb.Keyspace
var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create
vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return err
Expand Down Expand Up @@ -206,6 +207,9 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace)
if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return err
}
Expand Down Expand Up @@ -267,6 +271,12 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
if cerr := wr.dropArtifacts(ctx, false, &switcher{ts: ts, wr: wr}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
return
}
if cerr := wr.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil {
err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr)
}
}
}()

Expand Down

0 comments on commit 80f8a03

Please sign in to comment.