Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Workflow Update Client Command #12622

Merged
merged 47 commits into from
Mar 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5ecb846
Add update action to Workflow command
mattlord Mar 14, 2023
f5229b4
Merge remote-tracking branch 'origin/main' into workflow_update
mattlord Mar 15, 2023
c204502
Add on-ddl
mattlord Mar 15, 2023
785fff1
Made on-ddl handling 5.7 compatible
mattlord Mar 15, 2023
771a96c
Various fixes and improvements
mattlord Mar 16, 2023
df2b5d7
Correct/unify Workflow usage output
mattlord Mar 16, 2023
571e605
Test fixes
mattlord Mar 16, 2023
f21de0d
Merge remote-tracking branch 'origin/main' into workflow_update
mattlord Mar 17, 2023
28a26cd
Minor changes after self review
mattlord Mar 17, 2023
f900c1c
Move Workflow Update to RPC
mattlord Mar 21, 2023
68217b6
Modify unit test
mattlord Mar 21, 2023
203103d
Thread request through and add method to Fake TMC
mattlord Mar 21, 2023
205eeaa
Use RPC request/response type throughout
mattlord Mar 21, 2023
48bf2f8
We don't care about action within callback
mattlord Mar 21, 2023
e833885
Updates after self review
mattlord Mar 21, 2023
f284a05
Various changes and re-arranging
mattlord Mar 22, 2023
7a80ee7
Add tabletmanager unit test
mattlord Mar 22, 2023
f4ba5ab
Build out test and fix bugs found
mattlord Mar 22, 2023
b1f5573
Minor changes after self review
mattlord Mar 22, 2023
c2a7e88
Cleanup in reverse creation order
mattlord Mar 22, 2023
62194d6
Remove now incorrect unit test addition
mattlord Mar 22, 2023
63ffc23
No final results in show/listall/--dry-run update
mattlord Mar 22, 2023
8e1f4a2
Minor unit test improvements
mattlord Mar 22, 2023
0cddbbd
Nitty nitter gonna nit
mattlord Mar 22, 2023
7a99b5f
Address review comments
mattlord Mar 22, 2023
4ee9931
Use more meaningful const var name
mattlord Mar 22, 2023
97dcacf
Add vtctldclient command for Workflow update
mattlord Mar 23, 2023
ba52ffe
Minor changes after quick self review
mattlord Mar 23, 2023
6f15ad5
Minor fixes after local testing
mattlord Mar 23, 2023
3f8ec81
Update vtctldclient --help output
mattlord Mar 23, 2023
0949a03
Address review comments
mattlord Mar 23, 2023
f854db0
Address remaining review comments
mattlord Mar 23, 2023
1ccc601
Being annoyingly pedantic ... halp
mattlord Mar 23, 2023
8de128e
Missed a spot
mattlord Mar 23, 2023
14b2869
Minor improvements after final self review
mattlord Mar 23, 2023
1cbb4c5
One last tiny optimization
mattlord Mar 24, 2023
12eb98c
Since I have to run CI again...
mattlord Mar 24, 2023
5e37c3a
Re-add missing vtctl[client] check for any provided changes
mattlord Mar 24, 2023
b1520ac
Add full/proper support for Reshard worfklows.
mattlord Mar 25, 2023
c8d7238
Add the most basic e2e test
mattlord Mar 25, 2023
165703b
Use require.NotEmpty in new e2e checks
mattlord Mar 25, 2023
bdb5f90
Test updating workflow in same relative point
mattlord Mar 25, 2023
190c70b
Use same ks.wf names
mattlord Mar 25, 2023
4e2d580
Adjust comments and Changed check for shard merges.
mattlord Mar 26, 2023
06a9802
Minor comment/help output changes on final self review.
mattlord Mar 27, 2023
73ff718
Trim whitespace on provided cells and tablet types
mattlord Mar 27, 2023
7a83b56
Use topoproto.ParseTabletType for input validation
mattlord Mar 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Various changes and re-arranging
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Mar 22, 2023
commit f284a05ad12de035b2512c0dba057b9c2492993e
2 changes: 1 addition & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
@@ -3656,7 +3656,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag
if err != nil {
return err
}
if action == "show" || action == "listall" {
if action == "show" {
return nil
}
}
41 changes: 8 additions & 33 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
@@ -50,11 +50,13 @@ func (tm *TabletManager) VReplicationWaitForPos(ctx context.Context, id int32, p
return tm.VREngine.WaitForPos(ctx, id, pos)
}

// UpdateVRWorkflow updates the sidecar databases's vreplication record
// for this tablet's vreplication workflow stream.
// UpdateVRWorkflow updates the sidecar databases's vreplication
// record for this tablet's vreplication workflow stream.
// Note: the VReplication engine creates a new controller for the
// workflow when the record is updated, so we also in effect restart
// the workflow via the update.
func (tm *TabletManager) UpdateVRWorkflow(ctx context.Context, req *tabletmanagerdatapb.UpdateVRWorkflowRequest) (*tabletmanagerdatapb.UpdateVRWorkflowResponse, error) {
restart := false
query := "select id, state, source, cell, tablet_types from %s.vreplication where workflow = %a"
query := "select id, source, cell, tablet_types from %s.vreplication where workflow = %a"
bindVars := map[string]*querypb.BindVariable{
"wf": sqltypes.StringBindVariable(req.Workflow),
}
@@ -73,14 +75,8 @@ func (tm *TabletManager) UpdateVRWorkflow(ctx context.Context, req *tabletmanage

row := res.Named().Row()
id := row.AsInt64("id", 0)
state := row.AsString("state", "")
cells := row.AsString("cells", "")
tabletTypes := row.AsString("tablet_types", "")
// If the stream was running then we will stop and restart it.
if state == "Running" {
state = "Stopped"
restart = true
}
bls := &binlogdatapb.BinlogSource{}
source := row.AsBytes("source", []byte{})
// For the string values, we use NULL to differentiate from
@@ -104,35 +100,14 @@ func (tm *TabletManager) UpdateVRWorkflow(ctx context.Context, req *tabletmanage
if err != nil {
return nil, err
}
query = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
query = "update %s.vreplication set source = %a, cell = %a, tablet_types = %a where id = %a"
shlomi-noach marked this conversation as resolved.
Show resolved Hide resolved
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(cells),
"tt": sqltypes.StringBindVariable(tabletTypes),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(query, sidecardb.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
}
res, err = tm.VREngine.Exec(stmt)

if err != nil {
return nil, err
}
if !restart {
return &tabletmanagerdatapb.UpdateVRWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

state = "Running"
query = "update %s.vreplication set state = %a where id = %a"
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(query, sidecardb.GetIdentifier(), ":st", ":id")
parsed = sqlparser.BuildParsedQuery(query, sidecardb.GetIdentifier(), ":sc", ":cl", ":tt", ":id")
mattlord marked this conversation as resolved.
Show resolved Hide resolved
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
36 changes: 36 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletmanager

import (
"testing"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestUpdateVRWorkflow(t *testing.T) {
ts := memorytopo.NewServer("cell1")
tm := newTestTM(t, ts, 1, "ks", "0")
defer tm.Stop()

tm.tmState.mu.Lock()
tm.tmState.tablet.Type = topodatapb.TabletType_PRIMARY
tm.tmState.mu.Unlock()

// wut am I doing to do here...
}
31 changes: 27 additions & 4 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
@@ -131,7 +132,6 @@ func (wr *Wrangler) QueryResultForTabletResults(results map[*topo.TabletInfo]*sq

// VExecResult runs VExec and the naggregates the results into a single *sqltypes.Result
func (wr *Wrangler) VExecResult(ctx context.Context, workflow, keyspace, query string, dryRun bool) (qr *sqltypes.Result, err error) {

results, err := wr.VExec(ctx, workflow, keyspace, query, dryRun)
if err != nil {
return nil, err
@@ -356,10 +356,12 @@ func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) {
var query string
updateSQL := "update _vt.vreplication set state = %s"
switch action {
case "stop", "update":
case "stop":
query = fmt.Sprintf(updateSQL, encodeString("Stopped"))
case "start":
query = fmt.Sprintf(updateSQL, encodeString("Running"))
case "update":
// We don't use the SQL interface, so this is only for dry-run purposes.
shlomi-noach marked this conversation as resolved.
Show resolved Hide resolved
case "delete":
query = sqlVReplicationDelete
default:
@@ -376,35 +378,56 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace,
}
if action == "update" {
changes := false
var dryRunChanges strings.Builder
req := &tabletmanagerdatapb.UpdateVRWorkflowRequest{
Workflow: workflow,
}
if cells.Changed {
changes = true
req.Cells = strings.TrimSpace(cells.Value.String())
dryRunChanges.WriteString(fmt.Sprintf(" cells=%q\n", req.Cells))
} else { // Indicate that we do NOT want to set the value to an empty string.
req.Cells = sqltypes.NULL.String()
}
if tabletTypes.Changed {
changes = true
req.TabletTypes = strings.TrimSpace(tabletTypes.Value.String())
dryRunChanges.WriteString(fmt.Sprintf(" tablet_types=%q\n", req.TabletTypes))
} else { // Indicate that we do NOT want to set the value to an empty string.
req.TabletTypes = sqltypes.NULL.String()
}
if onDDL.Changed {
changes = true
onddl, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(onDDL.Value.String())]
if !ok {
return nil, fmt.Errorf("invalid value provided for on-ddl: %v", onDDL.Value.String())
return nil, fmt.Errorf("invalid value provided for on-ddl: %s", onDDL.Value.String())
}
req.OnDdl = binlogdatapb.OnDDLAction(onddl)
dryRunChanges.WriteString(fmt.Sprintf(" on_ddl=%q\n", onDDL.Value.String()))
} else { // Indicate that we do NOT want to update the value.
req.OnDdl = -1
}
if !changes {
return nil, fmt.Errorf(errWorkflowUpdateWithoutChanges)
}
if !dryRun {
if dryRun {
wr.Logger().Printf("The following workflow fields will be updated:\n%s", dryRunChanges.String())
wr.Logger().Printf("On the following tablets in the %s keyspace for workflow %s:\n",
keyspace, workflow)
vx := newVExec(ctx, workflow, keyspace, "", wr)
if err := vx.getPrimaries(); err != nil {
return nil, err
}
tablets := vx.primaries
sort.Slice(tablets, func(i, j int) bool {
return tablets[i].AliasString() < tablets[j].AliasString()
})
for _, tablet := range tablets {
wr.Logger().Printf(" %s\n", tablet.AliasString())
}
wr.Logger().Printf("\n")
return nil, nil
} else {
callback = func(ctx context.Context, tablet *topo.TabletInfo) (*querypb.QueryResult, error) {
res, err := wr.tmc.UpdateVRWorkflow(ctx, tablet.Tablet, req)
return res.Result, err
Loading