Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PRS in Vtorc for electing new primary #9409

Merged
merged 16 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
81b7240
feat: use PRS to electNewPrimary in Vtorc
GuptaManan100 Dec 17, 2021
28b760f
feat: fix error checks in preFlightChecks
GuptaManan100 Dec 17, 2021
5ab7458
feat: allow ChooseNewPrimary to work with no avoidPrimaryAlias specified
GuptaManan100 Dec 17, 2021
cb34bd1
feat: check for empty replication status in findingForPositionTablet
GuptaManan100 Dec 17, 2021
06eecad
Merge remote-tracking branch 'upstream/main' into elect-new-primary-prs
GuptaManan100 Dec 17, 2021
f3be9f0
refactor: rename action to match the API call and use it in logging
GuptaManan100 Dec 17, 2021
c3ca85f
feat: convert left join to normal join, since both the tables informa…
GuptaManan100 Dec 17, 2021
83e3658
Merge remote-tracking branch 'upstream/main' into elect-new-primary-prs
GuptaManan100 Dec 17, 2021
c927871
test: fix preFlightsChecks expectations
GuptaManan100 Dec 17, 2021
1e8d7cc
test: add unit test to reparentShardLocked to check initialization wi…
GuptaManan100 Dec 17, 2021
2980793
test: add unit test for findPositionForTablet when the tablet has no …
GuptaManan100 Dec 17, 2021
a133cf8
feat: fix Atleast function to handle both nils properly, since the un…
GuptaManan100 Dec 17, 2021
6aa3086
test: added test for ChooseNewPrimary where avoidPrimary alias is nil
GuptaManan100 Dec 17, 2021
895b690
docs: fix a comment
GuptaManan100 Dec 17, 2021
dcf6905
docs: added comments explaining the functioning of the AtLeast function
GuptaManan100 Jan 5, 2022
f81e0dc
Merge remote-tracking branch 'upstream/main' into elect-new-primary-prs
GuptaManan100 Jan 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/mysql/replication_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ func (rp Position) Equal(other Position) bool {

// AtLeast returns true if this position is equal to or after another.
func (rp Position) AtLeast(other Position) bool {
if other.GTIDSet == nil {
return true
}
if rp.GTIDSet == nil {
return other.GTIDSet == nil
return false
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
return rp.GTIDSet.Contains(other.GTIDSet)
}
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/replication_position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestPositionZeroAtLeast(t *testing.T) {

func TestPositionAtLeastZero(t *testing.T) {
input1 := Position{GTIDSet: MariadbGTIDSet{3: MariadbGTID{Domain: 3, Server: 5555, Sequence: 1234}}}
input2 := Position{GTIDSet: MariadbGTIDSet(nil)}
input2 := Position{GTIDSet: nil}
want := true

if got := input1.AtLeast(input2); got != want {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/orchestrator/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const (
AllIntermediatePrimaryReplicasNotReplicating AnalysisCode = "AllIntermediatePrimaryReplicasNotReplicating"
FirstTierReplicaFailingToConnectToPrimary AnalysisCode = "FirstTierReplicaFailingToConnectToPrimary"
BinlogServerFailingToConnectToPrimary AnalysisCode = "BinlogServerFailingToConnectToPrimary"
PlannedReparentShard AnalysisCode = "PlannedReparentShard"
GraceFulPrimaryTakeover AnalysisCode = "GracefulPrimaryTakeover"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion go/vt/orchestrator/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
) AS count_distinct_logging_major_versions
FROM
vitess_tablet
LEFT JOIN database_instance primary_instance ON (
JOIN database_instance primary_instance ON (
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
vitess_tablet.hostname = primary_instance.hostname
AND vitess_tablet.port = primary_instance.port
)
Expand Down
115 changes: 37 additions & 78 deletions go/vt/orchestrator/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ func GracefulPrimaryTakeover(clusterName string, designatedKey *inst.InstanceKey
}
clusterPrimary := clusterPrimaries[0]

analysisEntry, err := forceAnalysisEntry(clusterName, inst.PlannedReparentShard, inst.GracefulPrimaryTakeoverCommandHint, &clusterPrimary.Key)
analysisEntry, err := forceAnalysisEntry(clusterName, inst.GraceFulPrimaryTakeover, inst.GracefulPrimaryTakeoverCommandHint, &clusterPrimary.Key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1730,14 +1730,14 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re
if promotedReplica != nil {
message := fmt.Sprintf("promoted replica: %+v", promotedReplica.Key)
AuditTopologyRecovery(topologyRecovery, message)
inst.AuditOperation("graceful-primary-takeover", &analysisEntry.AnalyzedInstanceKey, message)
inst.AuditOperation(string(analysisEntry.Analysis), &analysisEntry.AnalyzedInstanceKey, message)
}
// And this is the end; whether successful or not, we're done.
resolveRecovery(topologyRecovery, promotedReplica)
// Now, see whether we are successful or not. From this point there's no going back.
if promotedReplica != nil {
// Success!
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("GracefulPrimaryTakeover: successfully promoted %+v", promotedReplica.Key))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("%+v: successfully promoted %+v", analysisEntry.Analysis, promotedReplica.Key))

kvPairs := inst.GetClusterPrimaryKVPairs(analysisEntry.ClusterDetails.ClusterAlias, &promotedReplica.Key)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Writing KV %+v", kvPairs))
Expand All @@ -1753,7 +1753,7 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re
func() error {
before := analysisEntry.AnalyzedInstanceKey.StringCode()
after := promotedReplica.Key.StringCode()
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- GracefulPrimaryTakeover: updating cluster_alias: %v -> %v", before, after))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- %+v: updating cluster_alias: %v -> %v", analysisEntry.Analysis, before, after))
//~~~inst.ReplaceClusterName(before, after)
if alias := analysisEntry.ClusterDetails.ClusterAlias; alias != "" {
inst.SetClusterAlias(promotedReplica.Key.StringCode(), alias)
Expand All @@ -1768,93 +1768,52 @@ func postPrsCompletion(topologyRecovery *TopologyRecovery, analysisEntry inst.Re
}

// electNewPrimary elects a new primary while none were present before.
// TODO(sougou): this should be mreged with recoverDeadPrimary
func electNewPrimary(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false, true)
if topologyRecovery == nil {
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, false /*failIfFailedInstanceInActiveRecovery*/, true /*failIfClusterInActiveRecovery*/)
if topologyRecovery == nil || err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another electNewPrimary.", analysisEntry.AnalyzedInstanceKey))
return false, nil, err
}
log.Infof("Analysis: %v, will elect a new primary: %v", analysisEntry.Analysis, analysisEntry.SuggestedClusterAlias)

_, unlock, err := LockShard(context.Background(), analysisEntry.AnalyzedInstanceKey)
analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceKey)
if err != nil {
log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+
"skipProcesses: %v: NOT detecting/recovering host, could not obtain shard lock (%v)",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err)
return false, topologyRecovery, err
}
defer unlock(&err)

// TODO(sougou): check if another Orc succeeded before fixing anything.
AuditTopologyRecovery(topologyRecovery, "starting PlannedReparentShard for electing new primary.")

replicas, err := inst.ReadClusterAliasInstances(analysisEntry.SuggestedClusterAlias)
if err != nil {
return false, topologyRecovery, err
}
// TODO(sougou): this is not reliable, because of the timeout.
replicas = inst.StopReplicasNicely(replicas, time.Duration(config.Config.InstanceBulkOperationsWaitTimeoutSeconds)*time.Second)
if len(replicas) == 0 {
return false, topologyRecovery, fmt.Errorf("no instances in cluster %v", analysisEntry.SuggestedClusterAlias)
}

// Find an initial candidate
var candidate *inst.Instance
for _, replica := range replicas {
// TODO(sougou): this needs to do more. see inst.chooseCandidateReplica
if !inst.IsBannedFromBeingCandidateReplica(replica) {
candidate = replica
break
}
}
if candidate == nil {
err := fmt.Errorf("no candidate qualifies to be a primary")
AuditTopologyRecovery(topologyRecovery, err.Error())
return true, topologyRecovery, err
}

// Compare the current candidate with the rest to see if other instances can be
// moved under. If not, see if the other intance can become a candidate instead.
for _, replica := range replicas {
if replica == candidate {
continue
}
if err := inst.CheckMoveViaGTID(replica, candidate); err != nil {
if err := inst.CheckMoveViaGTID(candidate, replica); err != nil {
return false, topologyRecovery, fmt.Errorf("instances are not compatible: %+v %+v: %v", candidate, replica, err)
} else {
// Make sure the new candidate meets the requirements.
if !inst.IsBannedFromBeingCandidateReplica(replica) {
candidate = replica
}
}
ev, err := reparentutil.NewPlannedReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) {
level := event.GetLevel()
value := event.GetValue()
// we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery
switch level {
case logutilpb.Level_WARNING:
log.Warningf("PRS - %s", value)
case logutilpb.Level_ERROR:
log.Errorf("PRS - %s", value)
}
}
AuditTopologyRecovery(topologyRecovery, value)
})).ReparentShard(context.Background(),
analyzedTablet.Keyspace,
analyzedTablet.Shard,
reparentutil.PlannedReparentOptions{
WaitReplicasTimeout: time.Duration(config.Config.WaitReplicasTimeoutSeconds) * time.Second,
},
)

if _, err := inst.ChangeTabletType(candidate.Key, topodatapb.TabletType_PRIMARY); err != nil {
return true, topologyRecovery, err
}
// TODO(sougou): parallelize
for _, replica := range replicas {
if replica.Key == candidate.Key {
continue
}
if _, err := inst.MoveBelowGTID(&replica.Key, &candidate.Key); err != nil {
return false, topologyRecovery, err
}
}
count := inst.SemiSyncAckers(candidate.Key)
err = inst.SetSemiSyncPrimary(&candidate.Key, count > 0)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- electNewPrimary: applying semi-sync %v: success=%t", count > 0, (err == nil)))
if err != nil {
return false, topologyRecovery, err
}
_, err = inst.SetReadOnly(&candidate.Key, false)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- electNewPrimary: set read-only false: success=%t", (err == nil)))
if err != nil {
return false, topologyRecovery, err
// here we need to forcefully refresh all the tablets otherwise old information is used and failover scenarios are spawned off which are not required
// For example, if we do not refresh the tablets forcefully and the new primary is found in the cache then its source key is not updated and this spawns off
// PrimaryHasPrimary analysis which runs ERS
RefreshTablets(true /* forceRefresh */)
var promotedReplica *inst.Instance
if ev.NewPrimary != nil {
promotedReplica, _, _ = inst.ReadInstance(&inst.InstanceKey{
Hostname: ev.NewPrimary.MysqlHostname,
Port: int(ev.NewPrimary.MysqlPort),
})
}
return true, topologyRecovery, nil
postPrsCompletion(topologyRecovery, analysisEntry, promotedReplica)
return true, topologyRecovery, err
}

// fixClusterAndPrimary performs a traditional vitess PlannedReparentShard.
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,18 @@ func (pr *PlannedReparenter) preflightChecks(
tabletMap map[string]*topo.TabletInfo,
opts *PlannedReparentOptions, // we take a pointer here to set NewPrimaryAlias
) (isNoop bool, err error) {
if topoproto.TabletAliasEqual(opts.NewPrimaryAlias, opts.AvoidPrimaryAlias) {
// We don't want to fail when both NewPrimaryAlias and AvoidPrimaryAlias are nil.
// But when they are both nil, we assign AvoidPrimaryAlias to be ShardInfo.PrimaryAlias.
// In the case, where we are using PRS to initialize the cluster without specifying the NewPrimaryAlias
// all the three will be nil.
if opts.NewPrimaryAlias != nil && topoproto.TabletAliasEqual(opts.NewPrimaryAlias, opts.AvoidPrimaryAlias) {
return true, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "primary-elect tablet %v is the same as the tablet to avoid", topoproto.TabletAliasString(opts.NewPrimaryAlias))
}

if opts.NewPrimaryAlias == nil {
if !topoproto.TabletAliasEqual(opts.AvoidPrimaryAlias, ev.ShardInfo.PrimaryAlias) {
// We don't want to fail when both ShardInfo.PrimaryAlias and AvoidPrimaryAlias are nil.
// This happens when we are using PRS to initialize the cluster without specifying the NewPrimaryAlias
if ev.ShardInfo.PrimaryAlias != nil && !topoproto.TabletAliasEqual(opts.AvoidPrimaryAlias, ev.ShardInfo.PrimaryAlias) {
event.DispatchUpdate(ev, "current primary is different than tablet to avoid, nothing to do")
return true, nil
}
Expand Down
106 changes: 100 additions & 6 deletions go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package reparentutil

import (
"context"
"fmt"
"strings"
"testing"
"time"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -646,9 +649,8 @@ func TestPlannedReparenter_preflightChecks(t *testing.T) {
},
{
// this doesn't cause an actual error from ChooseNewPrimary, because
// the only way to do that is to set AvoidPrimaryAlias == nil, and
// that gets checked in preflightChecks before calling
// ChooseNewPrimary for other reasons. however we do check that we
// there is no way to do that other than something going horribly wrong
// in go runtime, however we do check that we
// get a non-nil result from ChooseNewPrimary in preflightChecks and
// bail out if we don't, so we're forcing that case here.
name: "cannot choose new primary-elect",
Expand Down Expand Up @@ -730,30 +732,48 @@ func TestPlannedReparenter_preflightChecks(t *testing.T) {
shouldErr: false,
},
{
name: "shard has no current primary and new primary not provided",
name: "shard has no current primary and new primary not provided - initialisation test",
ev: &events.Reparent{
ShardInfo: *topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: nil,
}, nil),
},
tmc: &testutil.TabletManagerClient{
ReplicationStatusResults: map[string]struct {
Position *replicationdatapb.Status
Error error
}{
"zone1-0000000100": { // most advanced position
Error: mysql.ErrNotReplica,
},
},
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
opts: &PlannedReparentOptions{},
expectedIsNoop: true,
expectedIsNoop: false,
expectedEvent: &events.Reparent{
ShardInfo: *topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: nil,
}, nil),
NewPrimary: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
},
},
shouldErr: true,
shouldErr: false,
},
}

Expand Down Expand Up @@ -2787,6 +2807,80 @@ func TestPlannedReparenter_reparentShardLocked(t *testing.T) {
},
},
},
{
name: "shard initialization with no new primary provided",
ts: memorytopo.NewServer("zone1"),
tmc: &testutil.TabletManagerClient{
PopulateReparentJournalResults: map[string]error{
"zone1-0000000200": nil,
},
InitPrimaryResults: map[string]struct {
Result string
Error error
}{
"zone1-0000000200": {
Result: "reparent journal position",
Error: nil,
},
},
ReplicationStatusResults: map[string]struct {
Position *replicationdatapb.Status
Error error
}{
"zone1-0000000200": {
Error: mysql.ErrNotReplica,
},
"zone1-0000000100": {
Error: fmt.Errorf("not providing replication status, so that 200 wins"),
},
},
SetReplicationSourceResults: map[string]error{
"zone1-0000000100": nil, // called during reparentTablets to make this tablet a replica of newPrimary
},
},
tablets: []*topodatapb.Tablet{
// Shard has no current primary in the beginning.
{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Keyspace: "testkeyspace",
Shard: "-",
},
{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Type: topodatapb.TabletType_REPLICA,
Keyspace: "testkeyspace",
Shard: "-",
},
},

ev: &events.Reparent{},
keyspace: "testkeyspace",
shard: "-",
opts: PlannedReparentOptions{},
shouldErr: false,
expectedEvent: &events.Reparent{
ShardInfo: *topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
KeyRange: &topodatapb.KeyRange{},
IsPrimaryServing: true,
}, nil),
NewPrimary: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Type: topodatapb.TabletType_REPLICA,
Keyspace: "testkeyspace",
Shard: "-",
},
},
},
{
name: "preflight checks determine PRS is no-op",
ts: memorytopo.NewServer("zone1"),
Expand Down
Loading