Skip to content

Commit

Permalink
vdiff create wait and vdiff resume
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Sep 18, 2023
1 parent 28850ac commit a59c854
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 12 deletions.
104 changes: 96 additions & 8 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
topoprotopb "vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
)

var (
Expand Down Expand Up @@ -69,6 +71,10 @@ var (
Arg string
}{}

vDiffResumeOptions = struct {
UUID uuid.UUID
}{}

vDiffShowOptions = struct {
Arg string
Verbose bool
Expand Down Expand Up @@ -153,6 +159,25 @@ vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --targe
RunE: commandVDiffDelete,
}

// vDiffResume makes a VDiffResume gRPC call to a vtctld.
vDiffResume = &cobra.Command{
Use: "resume",
Short: "Resume a VDiff.",
Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace resume a037a9e2-5628-11ee-8c99-0242ac120002`,
DisableFlagsInUseLine: true,
Aliases: []string{"Resume"},
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
uuid, err := uuid.Parse(args[0])
if err != nil {
return fmt.Errorf("invalid UUID provided: %v", err)
}
vDiffResumeOptions.UUID = uuid
return nil
},
RunE: commandVDiffResume,
}

// vDiffShow makes a VDiffShow gRPC call to a vtctld.
vDiffShow = &cobra.Command{
Use: "show",
Expand Down Expand Up @@ -211,32 +236,93 @@ func commandVDiffCreate(cmd *cobra.Command, args []string) error {
return err
}

if vDiffCreateOptions.Wait {
tkr := time.NewTicker(vDiffCreateOptions.WaitUpdateInterval)
defer tkr.Stop()
var state vdiff.VDiffState
ctx := common.GetCommandCtx()
vtctldClient := common.GetClient()
uuidStr := vDiffCreateOptions.UUID.String()
for {
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-tkr.C:
resp, err := vtctldClient.VDiffShow(ctx, &vtctldatapb.VDiffShowRequest{
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Arg: uuidStr,
})
if err != nil {
return err
}
if state, err = displayVDiff2ShowSingleSummary(format, common.BaseOptions.TargetKeyspace, common.BaseOptions.Workflow, uuidStr, resp, false); err != nil {
return err
}
if state == vdiff.CompletedState {
return nil
}
}
}
} else {
var data []byte
if format == "json" {
data, err = cli.MarshalJSON(resp)
if err != nil {
return err
}
} else {
data = []byte(fmt.Sprintf("VDiff %s scheduled on target shards, use show to view progress", resp.Uuid))
}
fmt.Printf("%s\n", data)
}

return nil
}

func commandVDiffDelete(cmd *cobra.Command, args []string) error {
format, err := common.GetOutputFormat(cmd)
if err != nil {
return err
}
cli.FinishedParsing(cmd)

resp, err := common.GetClient().VDiffDelete(common.GetCommandCtx(), &vtctldatapb.VDiffDeleteRequest{
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Arg: vDiffDeleteOptions.Arg,
})

if err != nil {
return err
}

var data []byte
if format == "json" {
data, err = cli.MarshalJSON(resp)
if err != nil {
return err
}
} else {
data = []byte(fmt.Sprintf("VDiff %s scheduled on target shards, use show to view progress", resp.Uuid))
data = []byte(resp.Status)
}

fmt.Printf("%s\n", data)

return nil
}

func commandVDiffDelete(cmd *cobra.Command, args []string) error {
func commandVDiffResume(cmd *cobra.Command, args []string) error {
format, err := common.GetOutputFormat(cmd)
if err != nil {
return err
}
cli.FinishedParsing(cmd)

resp, err := common.GetClient().VDiffDelete(common.GetCommandCtx(), &vtctldatapb.VDiffDeleteRequest{
resp, err := common.GetClient().VDiffResume(common.GetCommandCtx(), &vtctldatapb.VDiffResumeRequest{
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Arg: vDiffDeleteOptions.Arg,
Uuid: vDiffResumeOptions.UUID.String(),
})

if err != nil {
Expand Down Expand Up @@ -284,8 +370,7 @@ type vdiffSummary struct {
Progress *vdiff.ProgressReport `json:"Progress,omitempty"`
}

const (
summaryTextTemplate = `
const summaryTextTemplate = `
VDiff Summary for {{.Keyspace}}.{{.Workflow}} ({{.UUID}})
State: {{.State}}
{{if .Errors}}
Expand All @@ -310,7 +395,6 @@ Table {{$table.TableName}}:
Use "--format=json" for more detailed output.
`
)

type VDiffListing struct {
UUID, Workflow, Keyspace, Shard, State string
Expand Down Expand Up @@ -724,10 +808,14 @@ func registerVDiffCommands(root *cobra.Command) {
vDiffCreate.Flags().BoolVar(&vDiffCreateOptions.OnlyPKs, "only-pks", false, "When reporting missing rows, only show primary keys in the report.")
vDiffCreate.Flags().StringSliceVar(&vDiffCreateOptions.Tables, "tables", nil, "Only run vdiff for these tables in the workflow")
vDiffCreate.Flags().Uint32Var(&vDiffCreateOptions.MaxExtraRowsToCompare, "max-extra-rows-to-compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.")
vDiffCreate.Flags().BoolVar(&vDiffCreateOptions.Wait, "wait", false, "When creating or resuming a vdiff, wait for it to finish before exiting")
vDiffCreate.Flags().DurationVar(&vDiffCreateOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often")
vDiff.AddCommand(vDiffCreate)

vDiff.AddCommand(vDiffDelete)

vDiff.AddCommand(vDiffResume)

vDiffShow.Flags().BoolVar(&vDiffShowOptions.Verbose, "verbose", false, "Show verbose output in summaries")
vDiff.AddCommand(vDiffShow)
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4721,6 +4721,21 @@ func (s *VtctldServer) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDe
return resp, err
}

// VDiffResume is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (resp *vtctldatapb.VDiffResumeResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.VDiffResume")
defer span.Finish()

defer panicHandler(&err)

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("uuid", req.Uuid)

resp, err = s.ws.VDiffResume(ctx, req)
return resp, err
}

// VDiffShow is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (resp *vtctldatapb.VDiffShowResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.VDiffShow")
Expand Down
41 changes: 38 additions & 3 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe
return err
})
if err != nil {
log.Errorf("Error executing action %s: %v", vdiff.CreateAction, err)
log.Errorf("Error executing vdiff create action: %v", err)
return nil, err
}

Expand Down Expand Up @@ -1397,7 +1397,7 @@ func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRe
return err
})
if err != nil {
log.Errorf("Error executing action %s: %v", vdiff.CreateAction, err)
log.Errorf("Error executing vdiff delete action: %v", err)
return nil, err
}
var status string
Expand All @@ -1412,6 +1412,41 @@ func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRe
}, nil
}

// VDiffResume is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume")
defer span.Finish()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("uuid", req.Uuid)

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.ResumeAction),
VdiffUuid: req.Uuid,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
})
if err != nil {
log.Errorf("Error executing vdiff resume action: %v", err)
return nil, err
}

return &vtctldatapb.VDiffResumeResponse{
Status: fmt.Sprintf("Resumed VDiff %s", req.Uuid),
}, nil
}

// VDiffShow is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow")
Expand Down Expand Up @@ -1445,7 +1480,7 @@ func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowReques
return err
})
if output.err != nil {
log.Errorf("Error executing show action: %v", output.err)
log.Errorf("Error executing vdiff show action: %v", output.err)
return nil, output.err
}

Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ const (
var (
Actions = []VDiffAction{CreateAction, ShowAction, StopAction, ResumeAction, DeleteAction}
ActionArgs = []string{AllActionArg, LastActionArg}

// The real zero value has nested nil pointers.
vDiffOptionsZeroVal = &tabletmanagerdatapb.VDiffOptions{
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{},
CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{},
}
)

func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
Expand Down Expand Up @@ -115,7 +122,7 @@ func (vde *Engine) getVDiffSummary(vdiffID int64, dbClient binlogplayer.DBClient
func (vde *Engine) fixupOptions(options *tabletmanagerdatapb.VDiffOptions) (*tabletmanagerdatapb.VDiffOptions, error) {
// Assign defaults to sourceCell and targetCell if not specified.
if options == nil {
options = &tabletmanagerdatapb.VDiffOptions{}
options = vDiffOptionsZeroVal
}
sourceCell := options.PickerOptions.SourceCell
targetCell := options.PickerOptions.TargetCell
Expand Down

0 comments on commit a59c854

Please sign in to comment.