Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do election in order based on failed primary rank to avoid voting conflicts #1018

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ void clusterInit(void) {
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_failed_primary_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
Expand Down Expand Up @@ -3135,6 +3136,20 @@ int clusterProcessPacket(clusterLink *link) {
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
sender->configEpoch = sender_claimed_config_epoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);

if (server.cluster->failover_auth_time && sender->configEpoch >= server.cluster->failover_auth_epoch) {
/* Another node has claimed an epoch greater than or equal to ours.
* If we have an ongoing election, reset it because we cannot win
* with an epoch smaller than or equal to the incoming claim. This
* allows us to start a new election as soon as possible. */
server.cluster->failover_auth_time = 0;
serverLog(LL_WARNING,
"Failover election in progress for epoch %llu, but received a claim from node %.40s (%s) "
"with an equal or higher epoch %llu. Resetting the election since we cannot win.",
(unsigned long long)server.cluster->failover_auth_epoch, sender->name, sender->human_nodename,
(unsigned long long)sender->configEpoch);
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
Expand Down Expand Up @@ -4412,6 +4427,45 @@ int clusterGetReplicaRank(void) {
return rank;
}

/* This function returns the "rank" of this instance's primary, in the context
* of all failed primary list. The primary node will be ignored if failed time
* exceeds cluster-node-timeout * cluster-replica-validity-factor.
*
* If multiple primary nodes go down at the same time, there is a certain
* probability that their replicas will initiate the elections at the same time,
* and lead to insufficient votes.
*
* The failed primary rank is used to add a delay to start an election in order
* to avoid simultaneous elections of replicas. */
int clusterGetFailedPrimaryRank(void) {
serverAssert(nodeIsReplica(myself));
serverAssert(myself->replicaof);

int rank = 0;
mstime_t now = mstime();
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

/* Skip nodes that do not need to participate in the rank. */
if (!nodeFailed(node) || !clusterNodeIsVotingPrimary(node) || node->num_replicas == 0) continue;

/* If cluster-replica-validity-factor is enabled, skip the invalid nodes. */
if (server.cluster_replica_validity_factor) {
if ((now - node->fail_time) > (server.cluster_node_timeout * server.cluster_replica_validity_factor))
continue;
}

if (memcmp(node->name, myself->replicaof->name, CLUSTER_NAMELEN) < 0) rank++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to sort by shard_id? replicaof is not as reliable/up-to-date as shard_id. There is the chain replication and there is still the replicaof cycle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to make sense, I'll think about it later.

}
dictReleaseIterator(di);

return rank;
}

/* This function is called by clusterHandleReplicaFailover() in order to
* let the replica log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
Expand Down Expand Up @@ -4582,6 +4636,11 @@ void clusterHandleReplicaFailover(void) {
* Specifically 1 second * rank. This way replicas that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000;
/* We add another delay that is proportional to the failed primary rank.
* Specifically 0.5 second * rank. This way those failed primaries will be
* elected in rank to avoid the vote conflicts. */
server.cluster->failover_failed_primary_rank = clusterGetFailedPrimaryRank();
server.cluster->failover_auth_time += server.cluster->failover_failed_primary_rank * 500;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - how did you arrive at 500? Given that CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST is broadcast and answered pretty much right away, unless the voter is busy, I would think the network round trip time between any two nodes should be significantly less than 50 ms for all deployments. I wonder if we could tighten it up a bit to like 250 or 200?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 500 is just the experience points gained from here. I usually think that one election round can be completed between 500ms - 1s. Yes, i think the numbers may be adjustable, but I haven't experimented with it.

        server.cluster->failover_auth_time = mstime() +
                                             500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
                                             random() % 500; /* Random delay between 0 and 500 milliseconds. */

/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
Expand All @@ -4590,9 +4649,9 @@ void clusterHandleReplicaFailover(void) {
}
serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
"(rank #%d, primary rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank,
replicationGetReplicaOffset());
server.cluster->failover_failed_primary_rank, replicationGetReplicaOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other replicas so that they'll updated their offsets
* if our offset is better. */
Expand All @@ -4604,6 +4663,9 @@ void clusterHandleReplicaFailover(void) {
* replicas for the same primary since we computed our election delay.
* Update the delay if our rank changed.
*
* It is also possible that we received the message that telling a
* shard is up. Update the delay if our failed_primary_rank changed.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 && server.cluster->mf_end == 0) {
int newrank = clusterGetReplicaRank();
Expand All @@ -4614,6 +4676,15 @@ void clusterHandleReplicaFailover(void) {
serverLog(LL_NOTICE, "Replica rank updated to #%d, added %lld milliseconds of delay.", newrank,
added_delay);
}

int new_failed_primary_rank = clusterGetFailedPrimaryRank();
if (new_failed_primary_rank != server.cluster->failover_failed_primary_rank) {
long long added_delay = (new_failed_primary_rank - server.cluster->failover_failed_primary_rank) * 500;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_failed_primary_rank = new_failed_primary_rank;
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
new_failed_primary_rank, added_delay);
}
}

/* Return ASAP if we can't still start the election. */
Expand Down
15 changes: 8 additions & 7 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,14 @@ struct clusterState {
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
/* The following fields are used to take the replica state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This replica rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a replica is currently not able to
failover. See the CANT_FAILOVER_* macros. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This replica rank for current auth request. */
int failover_failed_primary_rank; /* The rank of this instance in the context of all failed primary list. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a replica is currently not able to
* failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/cluster/failover2.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,34 @@ start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval
}

} ;# start_cluster

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test may be time-consuming. It basically cannot pass before the patch, but can pass locally after the patch.

start_cluster 32 15 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 15000}} {
test "Multiple primary nodes are down, rank them based on the failed primary" {
# Killing these primary nodes.
for {set j 0} {$j < 15} {incr j} {
pause_process [srv -$j pid]
}

# Make sure that a node starts failover.
wait_for_condition 1000 100 {
[s -40 role] == "master"
} else {
fail "No failover detected"
}

# Wait for the cluster state to become ok.
for {set j 0} {$j < [llength $::servers]} {incr j} {
if {[process_is_paused [srv -$j pid]]} continue
wait_for_condition 1000 100 {
[CI $j cluster_state] eq "ok"
} else {
fail "Cluster node $j cluster_state:[CI $j cluster_state]"
}
}

# Resuming these primary nodes, speed up the shutdown.
for {set j 0} {$j < 31} {incr j} {
resume_process [srv -$j pid]
}
}
} ;# start_cluster
Loading