Skip to content

Commit

Permalink
OnlineDDL: fix scenarios where migration hangs instead of directly fa…
Browse files Browse the repository at this point in the history
…iling (#14290)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
shlomi-noach and mattlord authored Oct 17, 2023
1 parent 8575b17 commit 46f6ae5
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 67 deletions.
19 changes: 19 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func testScheduler(t *testing.T) {
createViewDependsOnExtraColumn = `
CREATE VIEW t1_test_view AS SELECT id, extra_column FROM t1_test
`
alterNonexistent = `
ALTER TABLE nonexistent FORCE
`
)

testReadTimestamp := func(t *testing.T, uuid string, timestampColumn string) (timestamp string) {
Expand Down Expand Up @@ -960,6 +963,22 @@ func testScheduler(t *testing.T) {
})
})
}
// Failure scenarios
t.Run("fail nonexistent", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, createParams(alterNonexistent, "vitess", "vtgate", "", "", false))

status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.Contains(t, message, "errno 1146")
}
})

// 'mysql' strategy
t.Run("mysql strategy", func(t *testing.T) {
t.Run("declarative", func(t *testing.T) {
Expand Down
110 changes: 60 additions & 50 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,64 @@ func (e *Executor) reviewImmediateOperations(ctx context.Context, capableOf mysq
return false, nil
}

// reviewQueuedMigration investigates a single migration found in `queued` state.
// It analyzes whether the migration can & should be fulfilled immediately (e.g. via INSTANT DDL or just because it's a CREATE or DROP),
// or backfils necessary information if it's a REVERT.
// If all goes well, it sets `reviewed_timestamp` which then allows the state machine to schedule the migration.
func (e *Executor) reviewQueuedMigration(ctx context.Context, uuid string, capableOf mysql.CapableOf) error {
onlineDDL, row, err := e.readMigration(ctx, uuid)
if err != nil {
return err
}
// handle REVERT migrations: populate table name and update ddl action and is_view:
ddlAction := row["ddl_action"].ToString()
isRevert := false
if ddlAction == schema.RevertActionStr {
isRevert = true
rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL)
if err != nil {
return err
}
if rowModified {
// re-read migration and entire row
onlineDDL, row, err = e.readMigration(ctx, uuid)
if err != nil {
return err
}
ddlAction = row["ddl_action"].ToString()
}
}
isView := row.AsBool("is_view", false)
isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView)
if err != nil {
return err
}
if isImmediate {
if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil {
return err
}
}
// Find conditions where the migration cannot take place:
switch onlineDDL.Strategy {
case schema.DDLStrategyMySQL:
strategySetting := onlineDDL.StrategySetting()
if strategySetting.IsPostponeCompletion() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy")
}
if strategySetting.IsAllowZeroInDateFlag() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy")
}
}

// The review is complete. We've backfilled details on the migration row. We mark
// the migration as having been reviewed. The function scheduleNextMigration() will then
// have access to this row.
if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil {
return err
}
return nil
}

// reviewQueuedMigrations iterates through queued migrations and sees if any information needs to be updated.
// The function analyzes the queued migration and fills in some blanks:
// - If this is a REVERT migration, what table is affected? What's the operation?
Expand All @@ -2332,57 +2390,9 @@ func (e *Executor) reviewQueuedMigrations(ctx context.Context) error {

for _, uuidRow := range r.Named().Rows {
uuid := uuidRow["migration_uuid"].ToString()
onlineDDL, row, err := e.readMigration(ctx, uuid)
if err != nil {
return err
if err := e.reviewQueuedMigration(ctx, uuid, capableOf); err != nil {
e.failMigration(ctx, &schema.OnlineDDL{UUID: uuid}, err)
}
// handle REVERT migrations: populate table name and update ddl action and is_view:
ddlAction := row["ddl_action"].ToString()
isRevert := false
if ddlAction == schema.RevertActionStr {
isRevert = true
rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL)
if err != nil {
return err
}
if rowModified {
// re-read migration and entire row
onlineDDL, row, err = e.readMigration(ctx, uuid)
if err != nil {
return err
}
ddlAction = row["ddl_action"].ToString()
}
}
isView := row.AsBool("is_view", false)
isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView)
if err != nil {
return err
}
if isImmediate {
if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil {
return err
}
}
// Find conditions where the migration cannot take place:
switch onlineDDL.Strategy {
case schema.DDLStrategyMySQL:
strategySetting := onlineDDL.StrategySetting()
if strategySetting.IsPostponeCompletion() {
e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy"))
}
if strategySetting.IsAllowZeroInDateFlag() {
e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy"))
}
}

// The review is complete. We've backfilled details on the migration row. We mark
// the migration as having been reviewed. The function scheduleNextMigration() will then
// have access to this row.
if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil {
return err
}

}
return nil
}
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,14 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
err = vr.Replicate(ctx)
ct.lastWorkflowError.Record(err)

// If this is a mysql error that we know needs manual intervention OR
// we cannot identify this as non-recoverable, but it has persisted
// beyond the retry limit (maxTimeToRetryError).
// In addition, we cannot restart a workflow started with AtomicCopy which has _any_ error.
// If this is a MySQL error that we know needs manual intervention or
// it's a FAILED_PRECONDITION vterror, OR we cannot identify this as
// non-recoverable BUT it has persisted beyond the retry limit
// (maxTimeToRetryError). In addition, we cannot restart a workflow
// started with AtomicCopy which has _any_ error.
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() {
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {

log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import (
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/key"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file contains just the builders for ReplicatorPlan and TablePlan.
Expand Down Expand Up @@ -629,7 +632,7 @@ func (tpb *tablePlanBuilder) analyzeExtraSourcePkCols(colInfos []*ColumnInfo, so
if !col.IsGenerated {
// We shouldn't get here in any normal scenario. If a column is part of colInfos,
// then it must also exist in tpb.colExprs.
return fmt.Errorf("column %s not found in table expressions", col.Name)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "column %s not found in table expressions", col.Name)
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -123,6 +126,9 @@ func isUnrecoverableError(err error) bool {
if err == nil {
return false
}
if vterrors.Code(err) == vtrpcpb.Code_FAILED_PRECONDITION {
return true
}
sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if !isSQLErr {
return false
Expand Down
15 changes: 6 additions & 9 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,20 @@ import (
"strconv"
"strings"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vtgate/evalengine"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/vindexes"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// Plan represents the plan for a table.
Expand Down Expand Up @@ -865,5 +862,5 @@ func findColumn(ti *Table, name sqlparser.IdentifierCI) (int, error) {
return i, nil
}
}
return 0, fmt.Errorf("column %s not found in table %s", sqlparser.String(name), ti.Name)
return 0, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "column %s not found in table %s", sqlparser.String(name), ti.Name)
}

0 comments on commit 46f6ae5

Please sign in to comment.