Skip to content

Commit

Permalink
feat: fix the problem by running PromoteReplica in parallel with SetR…
Browse files Browse the repository at this point in the history
…eplicationSource

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Jun 30, 2023
1 parent 8473746 commit 177b069
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 402 deletions.
82 changes: 40 additions & 42 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(
primaryElect *topodatapb.Tablet,
tabletMap map[string]*topo.TabletInfo,
opts PlannedReparentOptions,
) (string, error) {
) error {
primaryElectAliasStr := topoproto.TabletAliasString(primaryElect.Alias)
ev.OldPrimary = proto.Clone(currentPrimary.Tablet).(*topodatapb.Tablet)

Expand All @@ -231,7 +231,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(

snapshotPos, err := pr.tmc.PrimaryPosition(snapshotCtx, currentPrimary.Tablet)
if err != nil {
return "", vterrors.Wrapf(err, "cannot get replication position on current primary %v; current primary must be healthy to perform PlannedReparent", currentPrimary.AliasString())
return vterrors.Wrapf(err, "cannot get replication position on current primary %v; current primary must be healthy to perform PlannedReparent", currentPrimary.AliasString())
}

// Next, we wait for the primary-elect to catch up to that snapshot point.
Expand All @@ -246,12 +246,12 @@ func (pr *PlannedReparenter) performGracefulPromotion(
defer setSourceCancel()

if err := pr.tmc.SetReplicationSource(setSourceCtx, primaryElect, currentPrimary.Alias, 0, snapshotPos, true, IsReplicaSemiSync(opts.durability, currentPrimary.Tablet, primaryElect)); err != nil {
return "", vterrors.Wrapf(err, "replication on primary-elect %v did not catch up in time; replication must be healthy to perform PlannedReparent", primaryElectAliasStr)
return vterrors.Wrapf(err, "replication on primary-elect %v did not catch up in time; replication must be healthy to perform PlannedReparent", primaryElectAliasStr)
}

// Verify we still have the topology lock before doing the demotion.
if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil {
return "", vterrors.Wrap(err, "lost topology lock; aborting")
return vterrors.Wrap(err, "lost topology lock; aborting")
}

// Next up, demote the current primary and get its replication position.
Expand All @@ -265,7 +265,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(

primaryStatus, err := pr.tmc.DemotePrimary(demoteCtx, currentPrimary.Tablet)
if err != nil {
return "", vterrors.Wrapf(err, "failed to DemotePrimary on current primary %v: %v", currentPrimary.AliasString(), err)
return vterrors.Wrapf(err, "failed to DemotePrimary on current primary %v: %v", currentPrimary.AliasString(), err)
}

// Wait for the primary-elect to catch up to the position we demoted the
Expand Down Expand Up @@ -298,26 +298,10 @@ func (pr *PlannedReparenter) performGracefulPromotion(
finalWaitErr = vterrors.Wrapf(finalWaitErr, "encountered error while performing UndoDemotePrimary(%v): %v", currentPrimary.AliasString(), undoErr)
}

return "", finalWaitErr
}

// Primary-elect is caught up to the current primary. We can do the
// promotion now.
promoteCtx, promoteCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout)
defer promoteCancel()

rp, err := pr.tmc.PromoteReplica(promoteCtx, primaryElect, SemiSyncAckers(opts.durability, primaryElect) > 0)
if err != nil {
return "", vterrors.Wrapf(err, "primary-elect tablet %v failed to be promoted to primary; please try again", primaryElectAliasStr)
}

if ctx.Err() == context.DeadlineExceeded {
// PromoteReplica succeeded, but we ran out of time. PRS needs to be
// re-run to complete fully.
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "PLannedReparent timed out after successfully promoting primary-elect %v; please re-run to fix up the replicas", primaryElectAliasStr)
return finalWaitErr
}

return rp, nil
return nil
}

func (pr *PlannedReparenter) performInitialPromotion(
Expand Down Expand Up @@ -383,7 +367,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
primaryElect *topodatapb.Tablet,
tabletMap map[string]*topo.TabletInfo,
opts PlannedReparentOptions,
) (string, error) {
) error {
primaryElectAliasStr := topoproto.TabletAliasString(primaryElect.Alias)

pr.logger.Infof("no clear winner found for current primary term; checking if it's safe to recover by electing %v", primaryElectAliasStr)
Expand Down Expand Up @@ -457,7 +441,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
close(positions)

if rec.HasErrors() {
return "", vterrors.Wrap(rec.Error(), "failed to demote all tablets")
return vterrors.Wrap(rec.Error(), "failed to demote all tablets")
}

// Construct a mapping of alias to tablet position.
Expand All @@ -478,7 +462,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
// if the candidate primary is behind that tablet.
tp, ok := tabletPosMap[primaryElectAliasStr]
if !ok {
return "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "primary-elect tablet %v not found in tablet map", primaryElectAliasStr)
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "primary-elect tablet %v not found in tablet map", primaryElectAliasStr)
}

primaryElectPos := tp.pos
Expand All @@ -487,7 +471,7 @@ func (pr *PlannedReparenter) performPotentialPromotion(
// The primary-elect pos has to be at least as advanced as every tablet
// in the shard.
if !primaryElectPos.AtLeast(tp.pos) {
return "", vterrors.Errorf(
return vterrors.Errorf(
vtrpc.Code_FAILED_PRECONDITION,
"tablet %v (position: %v) contains transactions not found in primary-elect %v (position: %v)",
tp.alias, tp.pos, primaryElectAliasStr, primaryElectPos,
Expand All @@ -497,19 +481,9 @@ func (pr *PlannedReparenter) performPotentialPromotion(

// Check that we still have the topology lock.
if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil {
return "", vterrors.Wrap(err, "lost topology lock; aborting")
}

// Promote the candidate primary to type:PRIMARY.
promoteCtx, promoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer promoteCancel()

rp, err := pr.tmc.PromoteReplica(promoteCtx, primaryElect, SemiSyncAckers(opts.durability, primaryElect) > 0)
if err != nil {
return "", vterrors.Wrapf(err, "failed to promote %v to primary", primaryElectAliasStr)
return vterrors.Wrap(err, "lost topology lock; aborting")
}

return rp, nil
return nil
}

func (pr *PlannedReparenter) reparentShardLocked(
Expand Down Expand Up @@ -553,6 +527,11 @@ func (pr *PlannedReparenter) reparentShardLocked(

currentPrimary := FindCurrentPrimary(tabletMap, pr.logger)
reparentJournalPos := ""
// promoteReplicaRequired is a boolean that is used to store whether we need to call
// `PromoteReplica` when we reparent the tablets. This is required to be done when we are doing
// a potential or a graceful promotion.
// InitialPromotion calls `InitPrimary` and for partial promotion, the tablet is already a primary.
promoteReplicaRequired := false
// needsRefresh is used to keep track of whether we need to refresh the state
// of the new primary tablet. The only case that we need to reload the state
// is when we are initializing the new primary. The reason is that the first
Expand Down Expand Up @@ -601,15 +580,19 @@ func (pr *PlannedReparenter) reparentShardLocked(
case currentPrimary == nil && ev.ShardInfo.PrimaryAlias != nil:
// Case (2): no clear current primary. Try to find a safe promotion
// candidate, and promote to it.
reparentJournalPos, err = pr.performPotentialPromotion(ctx, keyspace, shard, ev.NewPrimary, tabletMap, opts)
err = pr.performPotentialPromotion(ctx, keyspace, shard, ev.NewPrimary, tabletMap, opts)
// We need to call `PromoteReplica` when we reparent the tablets.
promoteReplicaRequired = true
case topoproto.TabletAliasEqual(currentPrimary.Alias, opts.NewPrimaryAlias):
// Case (3): desired new primary is the current primary. Attempt to fix
// up replicas to recover from a previous partial promotion.
reparentJournalPos, err = pr.performPartialPromotionRecovery(ctx, ev.NewPrimary)
default:
// Case (4): desired primary and current primary differ. Do a graceful
// demotion-then-promotion.
reparentJournalPos, err = pr.performGracefulPromotion(ctx, ev, keyspace, shard, currentPrimary, ev.NewPrimary, tabletMap, opts)
err = pr.performGracefulPromotion(ctx, ev, keyspace, shard, currentPrimary, ev.NewPrimary, tabletMap, opts)
// We need to call `PromoteReplica` when we reparent the tablets.
promoteReplicaRequired = true
}

if err != nil {
Expand All @@ -620,7 +603,7 @@ func (pr *PlannedReparenter) reparentShardLocked(
return vterrors.Wrap(err, "lost topology lock, aborting")
}

if err := pr.reparentTablets(ctx, ev, reparentJournalPos, tabletMap, opts); err != nil {
if err := pr.reparentTablets(ctx, ev, reparentJournalPos, promoteReplicaRequired, tabletMap, opts); err != nil {
return err
}

Expand All @@ -637,6 +620,7 @@ func (pr *PlannedReparenter) reparentTablets(
ctx context.Context,
ev *events.Reparent,
reparentJournalPosition string,
promoteReplicaRequired bool,
tabletMap map[string]*topo.TabletInfo,
opts PlannedReparentOptions,
) error {
Expand Down Expand Up @@ -688,6 +672,20 @@ func (pr *PlannedReparenter) reparentTablets(
}(alias, tabletInfo.Tablet)
}

// If `PromoteReplica` call is requried, we should call it and use the position that it returns.
if promoteReplicaRequired {
// Promote the candidate primary to type:PRIMARY.
primaryPosition, err := pr.tmc.PromoteReplica(replCtx, ev.NewPrimary, SemiSyncAckers(opts.durability, ev.NewPrimary) > 0)
if err != nil {
pr.logger.Warningf("primary %v failed to PromoteReplica; cancelling replica reparent attempts", primaryElectAliasStr)
replCancel()
replicasWg.Wait()

return vterrors.Wrapf(err, "failed PromoteReplica(primary=%v, ts=%v): %v", primaryElectAliasStr, reparentJournalTimestamp, err)
}
reparentJournalPosition = primaryPosition
}

// Add a reparent journal entry on the new primary. If semi-sync is enabled,
// this blocks until at least one replica is reparented (above) and
// successfully replicating from the new primary.
Expand Down
Loading

0 comments on commit 177b069

Please sign in to comment.