Skip to content

Commit

Permalink
Trigger manual failover on SIGTERM / shutdown to cluster primary
Browse files Browse the repository at this point in the history
When a primary disappears, its slots are not served until an automatic
failover happens. It takes about n seconds (node timeout plus some seconds).
It's too much time for us to not accept writes.

If the host machine is about to shutdown for any reason, the processes
typically get a sigterm and have some time to shutdown gracefully. In
Kubernetes, this is 30 seconds by default.

When a primary receives a SIGTERM or a SHUTDOWN, let it trigger a failover
to one of the replicas as part of the graceful shutdown. This can reduce
some unavailability time. For example the replica needs to sense the
primary failure within the node-timeout before initating an election,
and now it can initiate an election quickly and win and gossip it.

This closes #939.

Signed-off-by: Binbin <binloveplay1314@qq.com>
  • Loading branch information
enjoy-binbin committed Sep 30, 2024
1 parent bb57dfe commit 6ab8888
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 5 deletions.
26 changes: 21 additions & 5 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3506,10 +3506,14 @@ int clusterProcessPacket(clusterLink *link) {
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
/* todo: see if this needed. */
/* My primary has already voted for me, so don't count it anymore. */
if (!(sender == myself->replicaof && server.cluster->mf_is_primary_failover)) {
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a primary and the sender
Expand Down Expand Up @@ -4592,6 +4596,11 @@ void clusterHandleReplicaFailover(void) {
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
/* todo: see if this is needed. */
/* This is a failover triggered by my primary, let's counts its vote. */
if (server.cluster->mf_is_primary_failover) {
server.cluster->failover_auth_count++;
}
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_NOTICE,
Expand Down Expand Up @@ -4816,6 +4825,7 @@ void resetManualFailover(void) {
}
server.cluster->mf_end = 0; /* No manual failover in progress. */
server.cluster->mf_can_start = 0;
server.cluster->mf_is_primary_failover = 0;
server.cluster->mf_replica = NULL;
server.cluster->mf_primary_offset = -1;
}
Expand Down Expand Up @@ -4844,6 +4854,7 @@ void clusterHandleManualFailover(void) {
/* Our replication offset matches the primary replication offset
* announced after clients were paused. We can start the failover. */
server.cluster->mf_can_start = 1;
server.cluster->mf_is_primary_failover = 0;
serverLog(LL_NOTICE, "All primary replication stream processed, "
"manual failover can start.");
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
Expand Down Expand Up @@ -6730,8 +6741,13 @@ int clusterCommandSpecial(client *c) {
/* If this is a forced failover, we don't need to talk with our
* primary to agree about the offset. We just failover taking over
* it without coordination. */
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
if (c == server.primary) {
serverLog(LL_NOTICE, "Forced failover primary request accepted (primary request from '%s').", client);
} else {
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
}
server.cluster->mf_can_start = 1;
server.cluster->mf_is_primary_failover = 1;
/* We can start a manual failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ struct clusterState {
or -1 if still not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting primary vote. */
int mf_is_primary_failover; /* The manual failover was triggered by my primary. */
/* The following fields are used by primaries to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3135,6 +3135,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("auto-failover-on-shutdown", NULL, MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
24 changes: 24 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4306,6 +4306,7 @@ int finishShutdown(void) {
int force = server.shutdown_flags & SHUTDOWN_FORCE;

/* Log a warning for each replica that is lagging. */
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
int num_replicas = 0, num_lagging_replicas = 0;
Expand All @@ -4320,6 +4321,14 @@ int finishShutdown(void) {
replicationGetReplicaName(replica), server.primary_repl_offset - replica->repl_ack_off, lag,
replstateToString(replica->repl_state));
}
/* Find the best replica, that is, the replica with the largest offset. */
if (replica->repl_state == REPLICA_STATE_ONLINE) {
if (best_replica == NULL) {
best_replica = replica;
} else if (replica->repl_ack_off > best_replica->repl_ack_off) {
best_replica = replica;
}
}
}
if (num_replicas > 0) {
serverLog(LL_NOTICE, "%d of %d replicas are in sync when shutting down.", num_replicas - num_lagging_replicas,
Expand Down Expand Up @@ -4419,6 +4428,21 @@ int finishShutdown(void) {
* send them pending writes. */
flushReplicasOutputBuffers();

if (server.auto_failover_on_shutdown && server.cluster_enabled && best_replica) {
/* Sending a CLUSTER FAILOVER FORCE to the best replica. */
const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$5\r\nFORCE\r\n";
if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) {
serverLog(LL_NOTICE, "Sending CLUSTER FAILOVER FORCE to replica %s succeeded.",
replicationGetReplicaName(best_replica));
} else {
serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER FORCE to replica: %s", strerror(errno));
}
}

if (server.auto_failover_on_shutdown && server.cluster_enabled && !best_replica) {
serverLog(LL_WARNING, "Unable to find a replica to perform an auto failover on shutdown.");
}

/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,7 @@ struct valkeyServer {
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
* the cluster after it is forgotten with CLUSTER FORGET. */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
int auto_failover_on_shutdown; /* Trigger manual failover on shutdown to primary. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
Expand Down
18 changes: 18 additions & 0 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ proc wait_replica_online r {
}
}

proc get_replica_acked_offset {primary replica_ip replica_port} {
set infostr [$primary info replication]
if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ value]} {
return $value
}
}

proc wait_replica_acked_ofs {primary replica_ip replica_port} {
$primary config set repl-ping-replica-period 3600
wait_for_condition 50 100 {
[status $primary master_repl_offset] eq [get_replica_acked_offset $primary $replica_ip $replica_port]
} else {
puts "INFO REPLICATION: [$primary info replication]"
fail "replica acked offset didn't match in time"
}
$primary config set repl-ping-replica-period 10
}

proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/cluster/auto-failover-on-shutdown.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
proc shutdown_on_how {srv_id how} {
if {$how == "shutdown"} {
catch {R $srv_id shutdown nosave}
} elseif {$how == "sigterm"} {
exec kill -SIGTERM [s -$srv_id process_id]
}
}

proc test_main {how} {
test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how" {
set primary [srv 0 client]
set replica1 [srv -3 client]
set replica1_pid [s -3 process_id]
set replica2 [srv -6 client]
set replica2_ip [srv -6 host]
set replica2_port [srv -6 port]

# Pause a replica so it has no chance to catch up with the offset.
pause_process $replica1_pid

# Primary write some data to increse the offset.
for {set i 0} {$i < 10} {incr i} {
$primary incr key_991803
}

# Wait the replica2 catch up with the offset
wait_replica_acked_ofs $primary $replica2_ip $replica2_port

# Shutdown the primary.
shutdown_on_how 0 $how

# Wait for the replica2 to become a primary.
wait_for_condition 1000 50 {
[s -6 role] eq {master}
} else {
puts "s -6 role: [s -6 role]"
fail "Failover does not happened"
}

# Make sure that the expected logs are printed.
verify_log_message 0 "*Sending CLUSTER FAILOVER FORCE to replica*" 0
verify_log_message -6 "*Forced failover primary request accepted*" 0

resume_process $replica1_pid
}

test "Unable to find a replica to perform an auto failover - $how" {
set primary [srv -6 client]
set replica1 [srv -3 client]
set replica1_pid [s -3 process_id]

pause_process $replica1_pid

$primary client kill type replica
shutdown_on_how 6 $how
wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10

resume_process $replica1_pid
}
}

start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} {
test_main "shutdown"
}

start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 5000 shutdown-timeout 0 auto-failover-on-shutdown yes}} {
test_main "sigterm"
}
4 changes: 4 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,10 @@ aof-timestamp-enabled no
# shutdown-on-sigint default
# shutdown-on-sigterm default

# TODO
#
# auto-failover-on-shutdown no

################ NON-DETERMINISTIC LONG BLOCKING COMMANDS #####################

# Maximum time in milliseconds for EVAL scripts, functions and in some cases
Expand Down

0 comments on commit 6ab8888

Please sign in to comment.