Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve MoveTables Create Error Handling #13737

Merged
merged 22 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5a908cf
MoveTables: don't create routing rules until after streams
mattlord Aug 7, 2023
e478ab2
Add vtctlclient unit test (that fails on main)
mattlord Aug 7, 2023
d200fd7
Improve comment
mattlord Aug 7, 2023
637ba76
Pedantic cleanup
mattlord Aug 8, 2023
a1dd8e1
Minor changes after self review
mattlord Aug 8, 2023
411f2fe
Stop using t.Fatal in new tests
mattlord Aug 8, 2023
d0d0f28
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 9, 2023
b47fe10
Add vtctldclient unit test
mattlord Aug 10, 2023
d7360b8
Automatically clean up on MoveTables Create error
mattlord Aug 14, 2023
83f5938
Add missing else in workflow server
mattlord Aug 14, 2023
3c381c4
Restore original logic
mattlord Aug 14, 2023
ed1beae
Add named return values for workflow case
mattlord Aug 14, 2023
fe4a51e
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 15, 2023
d98416f
Improve vtctldclient unit test
mattlord Aug 15, 2023
5741fcf
Minor tweaks
mattlord Aug 15, 2023
982bc2f
Minor formatting improvements
mattlord Aug 15, 2023
f0713ee
Must stop nitting...
mattlord Aug 15, 2023
e3fc307
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 16, 2023
1da742b
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 20, 2023
24d8332
Don't fail e2e workflow if schema tracker is slow
mattlord Aug 20, 2023
80f8a03
Turns out that WorkflowDelete isn't the right fit
mattlord Aug 20, 2023
dc2742f
Address review comment
mattlord Aug 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st

// WaitForAuthoritative waits for a table to become authoritative
func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error {
timeout := time.After(10 * time.Second)
timeout := time.After(60 * time.Second)
Copy link
Contributor Author

@mattlord mattlord Aug 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressing an unrelated cause of test flakiness that was seen.

for {
select {
case <-timeout:
Expand Down
5 changes: 0 additions & 5 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ func (mz *materializer) prepareMaterializerStreams(req *vtctldatapb.MoveTablesCr
if err != nil {
return err
}
if mz.isPartial {
if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil {
return err
}
}
if err := mz.deploySchema(); err != nil {
return err
}
Expand Down
102 changes: 68 additions & 34 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/sqlerror"

Expand Down Expand Up @@ -947,7 +948,7 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI
// MoveTablesCreate is part of the vtctlservicepb.VtctldServer interface.
// It passes the embedded TabletRequest object to the given keyspace's
// target primary tablets that will be executing the workflow.
func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (*vtctldatapb.WorkflowStatusResponse, error) {
func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) (res *vtctldatapb.WorkflowStatusResponse, err error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesCreate")
defer span.Finish()

Expand All @@ -964,7 +965,6 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
tables = req.IncludeTables
externalTopo *topo.Server
sourceTopo *topo.Server = s.ts
err error
)

// When the source is an external cluster mounted using the Mount command.
Expand All @@ -978,6 +978,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}

var vschema *vschemapb.Keyspace
var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create
vschema, err = s.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1020,43 +1021,14 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
// Save the original in case we need to restore it for a late failure
// in the defer().
origVSchema = proto.Clone(vschema).(*vschemapb.Keyspace)
if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return nil, err
}
}
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
ms := &vtctldatapb.MaterializeSettings{
Workflow: req.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES,
Expand Down Expand Up @@ -1101,6 +1073,68 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}

// If we get an error after this point, where the vreplication streams/records
// have been created, then we clean up the workflow's artifacts.
defer func() {
if err != nil {
ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow)
if cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
return
}
if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil {
err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr)
}
}
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing
// rules fails to save, we may generate duplicate table errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no cases where if routing rules succeed to be saved, but then the vschema fails that things are inconsistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question! It led me to realize that we can instead utilize the WorkflowDelete() VtCtldServer function in the defer() as this performs additional cleanup, including the vschema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out that wasn't quite right, but I did add code to deal with the vschema in the defer: 80f8a03

if mz.isPartial {
if err := createDefaultShardRoutingRules(mz.ctx, mz.ms, mz.ts); err != nil {
return nil, err
}
}

rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}

if ms.SourceTimeZone != "" {
if err := mz.checkTZConversion(ctx, ms.SourceTimeZone); err != nil {
return nil, err
Expand Down
186 changes: 185 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"runtime/debug"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -30,6 +31,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/vindexes"

Expand Down Expand Up @@ -64,6 +66,8 @@ const (
stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1"
getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`"
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
)

var (
Expand Down Expand Up @@ -313,7 +317,7 @@ func TestMoveTables(t *testing.T) {
),
fmt.Sprintf("1|%s", bls),
), nil)
ftc.vrdbClient.ExpectRequest(`update _vt.vreplication set message='Picked source tablet: cell:\"zone1\" uid:200' where id=1`, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult(
Expand Down Expand Up @@ -568,3 +572,183 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
})
}
}

// TestFailedMoveTablesCreateCleanup tests that the workflow
// and its artifacts are cleaned up when the workflow creation
// fails -- specifically after the point where we have created
// the workflow streams.
func TestFailedMoveTablesCreateCleanup(t *testing.T) {
ctx := context.Background()
sourceKs := "sourceks"
sourceTabletUID := 200
shard := "0"
targetTabletUID := 300
targetKs := "targetks"
wf := "testwf"
table := defaultSchema.TableDefinitions[0].Name
invalidTimeZone := "NOPE"
bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}}",
sourceKs, shard, table, table)
tenv := newTestEnv(t, sourceKs, []string{shard})
defer tenv.close()
ws := workflow.NewServer(tenv.ts, tenv.tmc)

sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard)
defer tenv.deleteTablet(sourceTablet.tablet)
targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard)
defer tenv.deleteTablet(targetTablet.tablet)

mattlord marked this conversation as resolved.
Show resolved Hide resolved
tenv.mysqld.Schema = defaultSchema
tenv.mysqld.Schema.DatabaseSchema = tenv.dbName
tenv.mysqld.FetchSuperQueryMap = make(map[string]*sqltypes.Result)
tenv.mysqld.FetchSuperQueryMap[`select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where .*`] = sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"character_set_name|collation_name|column_name|data_type|column_type|extra",
"varchar|varchar|varchar|varchar|varchar|varchar",
),
"NULL|NULL|id|bigint|bigint|",
"NULL|NULL|c2|bigint|bigint|",
)

// Let's be sure that the routing rules are empty to start.
err := topotools.SaveRoutingRules(ctx, tenv.ts, nil)
require.NoError(t, err, "failed to save routing rules")

tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
"1",
),
)
targetTablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(
fmt.Sprintf("%s %s",
insertVReplicationPrefix,
fmt.Sprintf(`values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}} source_time_zone:\"%s\" target_time_zone:\"UTC\"', '', 0, 0, '%s', 'primary', now(), 0, 'Stopped', '%s', 1, 0, 0)`,
wf, sourceKs, shard, table, table, invalidTimeZone, strings.Join(tenv.cells, ","), tenv.dbName),
),
&sqltypes.Result{
RowsAffected: 1,
InsertID: 1,
},
nil,
)
targetTablet.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getVReplicationRecord,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source",
"int64|varchar",
),
fmt.Sprintf("1|%s", bls),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID),
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
"varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64",
),
fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"count(distinct table_name)",
"int64",
),
"1",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
"varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64",
),
fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf),
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"count(distinct table_name)",
"int64",
),
"1",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"@@binlog_row_image",
"varchar",
),
"FULL",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil)

tenv.tmc.setVReplicationExecResults(targetTablet.tablet,
fmt.Sprintf("select convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
fmt.Sprintf("convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone),
"datetime",
),
"NULL",
),
)

// We expect the workflow creation to fail due to the invalid time
// zone and thus the workflow iteslf to be cleaned up.
tenv.tmc.setVReplicationExecResults(sourceTablet.tablet,
fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)),
&sqltypes.Result{RowsAffected: 1},
)
tenv.tmc.setVReplicationExecResults(targetTablet.tablet,
fmt.Sprintf(deleteWorkflow, targetKs, wf),
&sqltypes.Result{RowsAffected: 1},
)

// Save the current target vschema.
vs, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")

_, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{
Workflow: wf,
SourceKeyspace: sourceKs,
TargetKeyspace: targetKs,
Cells: tenv.cells,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY},
IncludeTables: []string{table},
SourceTimeZone: invalidTimeZone,
})
require.ErrorContains(t, err, fmt.Sprintf("unable to perform time_zone conversions from %s to UTC", invalidTimeZone))

// Check that there are no orphaned routing rules.
rules, err := topotools.GetRoutingRules(ctx, tenv.ts)
require.NoError(t, err, "failed to get routing rules")
require.Equal(t, 0, len(rules), "expected no routing rules to be present")

// Check that our vschema changes were also rolled back.
vs2, err := tenv.ts.GetVSchema(ctx, targetKs)
require.NoError(t, err, "failed to get target vschema")
require.Equal(t, vs, vs2, "expected vschema to be unchanged")
}
Loading