Skip to content

Commit

Permalink
Fixup resume handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Sep 18, 2023
1 parent a59c854 commit 541e1fd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
33 changes: 24 additions & 9 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"

"github.com/google/uuid"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -186,14 +187,15 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog
vde.thisTablet.Alias, err)
}
}
if options, err = vde.fixupOptions(options); err != nil {
return err
}
optionsJSON, err := json.Marshal(options)
if err != nil {
return err
}
if action == CreateAction {
// Use options created from the command.
if options, err = vde.fixupOptions(options); err != nil {
return err
}
optionsJSON, err := json.Marshal(options)
if err != nil {
return err
}
query, err := sqlparser.ParseAndBind(sqlNewVDiff,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
Expand All @@ -216,7 +218,6 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog
resp.Id = int64(qr.InsertID)
} else {
query, err := sqlparser.ParseAndBind(sqlResumeVDiff,
sqltypes.StringBindVariable(string(optionsJSON)),
sqltypes.StringBindVariable(req.VdiffUuid),
)
if err != nil {
Expand All @@ -240,9 +241,23 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog
if err != nil {
return err
}
vdiffRecord := qr.Named().Row()
if vdiffRecord == nil {
return fmt.Errorf("unable to resume vdiff for UUID %s as it was not found on tablet %v (%w)",
req.VdiffUuid, vde.thisTablet.Alias, err)
}
if action == ResumeAction {
// Use existing options for the vdiff.
options = vDiffOptionsZeroVal
err = protojson.Unmarshal(vdiffRecord.AsBytes("options", []byte{}), options)
if err != nil {
return err
}
}

vde.mu.Lock()
defer vde.mu.Unlock()
if err := vde.addController(qr.Named().Row(), options); err != nil {
if err := vde.addController(vdiffRecord, options); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package vdiff
const (
sqlAnalyzeTable = "analyze table `%s`.`%s`"
sqlNewVDiff = "insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values(%a, %a, %a, %a, %a, %a, %a)"
sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %a, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending',
sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending',
vdt.state = 'pending' where vd.vdiff_uuid = %a and vd.id = vdt.vdiff_id and vd.state in ('completed', 'stopped')
and vdt.state in ('completed', 'stopped')`
sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending',
Expand Down

0 comments on commit 541e1fd

Please sign in to comment.