Skip to content

Commit

Permalink
Add on-ddl
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Mar 15, 2023
1 parent f5229b4 commit c204502
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 29 deletions.
9 changes: 5 additions & 4 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ var commands = []commandGroup{
{
name: "Workflow",
method: commandWorkflow,
params: "[--dry-run] [--cells] [--tablet-types] <ks.workflow> <action>",
params: "[--dry-run] [--cells=<cells>] [--tablet-types=<types>] [--on-ddl=<value>] <ks.workflow> <action>",
help: "Start/Stop/Update/Delete/Show/ListAll/Tags Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start",
},
},
Expand Down Expand Up @@ -3610,8 +3610,9 @@ func commandHelp(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.Fla

func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.FlagSet, args []string) error {
dryRun := subFlags.Bool("dry-run", false, "Does a dry run of Workflow and only reports the final query and list of tablets on which the operation will be applied")
cells := subFlags.String("cells", "", "New Cell(s) or CellAlias(es) (comma-separated) to replicate from. (Update only)")
tabletTypes := subFlags.String("tablet-types", "", "New source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). (Update only)")
subFlags.String("cells", "", "New Cell(s) or CellAlias(es) (comma-separated) to replicate from. (Update only)")
subFlags.String("tablet-types", "", "New source tablet types to replicate from (e.g. PRIMARY, REPLICA, RDONLY). (Update only)")
subFlags.String("on-ddl", "", "New instruction on what to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE. (Update only)")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -3647,7 +3648,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
return err
}
} else {
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, *cells, *tabletTypes)
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, subFlags.Lookup("cells"), subFlags.Lookup("tablet-types"), subFlags.Lookup("on-ddl"))
if err != nil {
return err
}
Expand Down
48 changes: 37 additions & 11 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/spf13/pflag"

workflow2 "vitess.io/vitess/go/vt/vtctl/workflow"

"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -294,7 +296,7 @@ func (wr *Wrangler) convertQueryResultToSQLTypesResult(results map[*topo.TabletI
}

// WorkflowAction can start/stop/update/delete or list streams in _vt.vreplication on all primaries in the target keyspace of the workflow.
func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, cells, tabletTypes string) (map[*topo.TabletInfo]*sqltypes.Result, error) {
func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, cells, tabletTypes, onDDL *pflag.Flag) (map[*topo.TabletInfo]*sqltypes.Result, error) {
switch action {
case "show":
replStatus, err := wr.ShowWorkflow(ctx, workflow, keyspace)
Expand All @@ -311,17 +313,18 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti
wr.printWorkflowList(keyspace, workflows)
return nil, err
case "update":
// This is the only place we use the cells and tabletTypes
// variables.
if _, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, cells, tabletTypes); err != nil {
// This is the only place we use the cells, tabletTypes, and
// onDDL variables.
if _, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, cells, tabletTypes, onDDL); err != nil {
return nil, vterrors.Wrapf(err, "failed to update the %s.%s workflow", keyspace, workflow)
}
// The workflow is stopped when updated, so now we restart
// the workflow for the changes to take effect and we
// return result of the restart.
action = "start"
}
results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, "", "")
unusedFlag := &pflag.Flag{}
results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun, unusedFlag, unusedFlag, unusedFlag)
return wr.convertQueryResultToSQLTypesResult(results), err
}

Expand All @@ -341,17 +344,40 @@ func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) {
return query, nil
}

func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, cells, tabletTypes string) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool, cells, tabletTypes, onDDL *pflag.Flag) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
query, err := wr.getWorkflowActionQuery(action)
if err != nil {
return nil, err
}
if action == "update" {
query += ", cell=%a, tablet_types=%a"
query, err = sqlparser.ParseAndBind(query,
sqltypes.StringBindVariable(strings.TrimSpace(cells)),
sqltypes.StringBindVariable(strings.TrimSpace(tabletTypes)),
)
changes := false
bindVars := []*querypb.BindVariable{}
if cells.Changed {
changes = true
query += ", cell=%a"
bindVars = append(bindVars, sqltypes.StringBindVariable(strings.TrimSpace(cells.Value.String())))
}
if tabletTypes.Changed {
changes = true
query += ", tablet_types=%a"
bindVars = append(bindVars, sqltypes.StringBindVariable(strings.TrimSpace(tabletTypes.Value.String())))
}
if onDDL.Changed {
changes = true
onDDLVal := strings.ToUpper(onDDL.Value.String())
if _, ok := binlogdatapb.OnDDLAction_value[onDDLVal]; !ok {
return nil, fmt.Errorf("invalid value provided for on-ddl: %v", onDDLVal)
}
// The source column is a prototext value. If there's currently no
// on_ddl value (which would mean it's the default of 0/IGNORE) then
// we append it. Otherwise, we replace the existing value.
query += fmt.Sprintf(", source=if(regexp_instr(source, 'on_ddl:') = 0, concat(source, ' on_ddl:%s'), regexp_replace(source, 'on_ddl:.*', 'on_ddl:%s'))",
onDDLVal, onDDLVal)
}
if !changes {
return nil, fmt.Errorf("no updates were provided; use --cells, --tablet-types, or --on-ddl to specify new values")
}
query, err = sqlparser.ParseAndBind(query, bindVars...)
if err != nil {
return nil, err
}
Expand Down
60 changes: 46 additions & 14 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,30 @@ import (
"testing"
"time"

"github.com/spf13/pflag"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/logutil"
)

var unusedFlag = &pflag.Flag{}

// stringValue implements the pflag.Value interface.
type stringValue struct {
value string
}

func (sv stringValue) Set(val string) error {
return nil
}
func (sv stringValue) String() string {
return sv.value
}
func (sv stringValue) Type() string {
return "string"
}

func TestVExec(t *testing.T) {
ctx := context.Background()
workflow := "wrWorkflow"
Expand Down Expand Up @@ -185,16 +203,16 @@ func TestWorkflowListStreams(t *testing.T) {
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)

_, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false, "", "")
_, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false, unusedFlag, unusedFlag, unusedFlag)
require.NoError(t, err)

_, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false, "", "")
_, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false, unusedFlag, unusedFlag, unusedFlag)
require.Errorf(t, err, "node doesn't exist: keyspaces/badks/shards")

_, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false, "", "")
_, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false, unusedFlag, unusedFlag, unusedFlag)
require.Errorf(t, err, "no streams found for workflow badwf in keyspace target")
logger.Clear()
_, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false, "", "")
_, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false, unusedFlag, unusedFlag, unusedFlag)
require.NoError(t, err)
want := `{
"Workflow": "wrWorkflow",
Expand Down Expand Up @@ -313,7 +331,7 @@ func TestWorkflowListStreams(t *testing.T) {
got = re.ReplaceAllLiteralString(got, `"MaxVReplicationTransactionLag": 0`)
require.Equal(t, want, got)

results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false, "", "")
results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false, unusedFlag, unusedFlag, unusedFlag)
require.Nil(t, err)

// convert map to list and sort it for comparison
Expand All @@ -327,7 +345,7 @@ func TestWorkflowListStreams(t *testing.T) {
require.ElementsMatch(t, wantResults, gotResults)

logger.Clear()
results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true, "", "")
results, err = wr.execWorkflowAction(ctx, workflow, keyspace, "stop", true, unusedFlag, unusedFlag, unusedFlag)
require.Nil(t, err)
require.Equal(t, "map[]", fmt.Sprintf("%v", results))
dryRunResult := `Query: update _vt.vreplication set state = 'Stopped' where db_name = 'vt_target' and workflow = 'wrWorkflow'
Expand Down Expand Up @@ -463,17 +481,31 @@ func TestWorkflowUpdate(t *testing.T) {
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
cells := "zone1,zone2"
tabletTypes := "rdonly,spare"

cells := &pflag.Flag{
Name: "cells",
Value: stringValue{"zone1,zone2"},
Changed: true,
}
tabletTypes := &pflag.Flag{
Name: "tablet-types",
Value: stringValue{"rdonly,spare"},
Changed: true,
}
onDDL := &pflag.Flag{
Name: "on-ddl",
Value: stringValue{"EXEC"},
Changed: true,
}
// First we stop the workflow and update the config on both
// target primaries.
env.tmc.setVRResults(env.tmc.tablets[200].tablet,
fmt.Sprintf("update _vt.vreplication set state = 'Stopped', cell = '%s', tablet_types = '%s' where db_name = 'vt_%s' and workflow = '%s'", cells, tabletTypes, keyspace, workflow),
fmt.Sprintf("update _vt.vreplication set state = 'Stopped', cell = '%s', tablet_types = '%s', source = if(regexp_instr(source, 'on_ddl:') = 0, concat(source, ' on_ddl:%s'), regexp_replace(source, 'on_ddl:.*', 'on_ddl:%s')) where db_name = 'vt_%s' and workflow = '%s'",
cells.Value.String(), tabletTypes.Value.String(), onDDL.Value.String(), onDDL.Value.String(), keyspace, workflow),
&sqltypes.Result{},
)
env.tmc.setVRResults(env.tmc.tablets[210].tablet,
fmt.Sprintf("update _vt.vreplication set state = 'Stopped', cell = '%s', tablet_types = '%s' where db_name = 'vt_%s' and workflow = '%s'", cells, tabletTypes, keyspace, workflow),
fmt.Sprintf("update _vt.vreplication set state = 'Stopped', cell = '%s', tablet_types = '%s', source = if(regexp_instr(source, 'on_ddl:') = 0, concat(source, ' on_ddl:%s'), regexp_replace(source, 'on_ddl:.*', 'on_ddl:%s')) where db_name = 'vt_%s' and workflow = '%s'",
cells.Value.String(), tabletTypes.Value.String(), onDDL.Value.String(), onDDL.Value.String(), keyspace, workflow),
&sqltypes.Result{},
)
// Then we restart the workflow for the config changes to take
Expand All @@ -487,13 +519,13 @@ func TestWorkflowUpdate(t *testing.T) {
&sqltypes.Result{},
)

_, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", false, cells, tabletTypes)
_, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", false, cells, tabletTypes, onDDL)
require.NoError(t, err)

results, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", true, cells, tabletTypes)
results, err := wr.WorkflowAction(ctx, workflow, keyspace, "update", true, cells, tabletTypes, onDDL)
require.NoError(t, err)
require.Equal(t, "map[]", fmt.Sprintf("%v", results))
dryRunResult := `Query: update _vt.vreplication set state = 'Stopped', cell = 'zone1,zone2', tablet_types = 'rdonly,spare' where db_name = 'vt_target' and workflow = 'wrWorkflow'
dryRunResult := `Query: update _vt.vreplication set state = 'Stopped', cell = 'zone1,zone2', tablet_types = 'rdonly,spare', source = if(regexp_instr(source, 'on_ddl:') = 0, concat(source, ' on_ddl:EXEC'), regexp_replace(source, 'on_ddl:.*', 'on_ddl:EXEC')) where db_name = 'vt_target' and workflow = 'wrWorkflow'
will be run on the following streams in keyspace target for workflow wrWorkflow:
Expand Down

0 comments on commit c204502

Please sign in to comment.