Skip to content

Commit

Permalink
Add option for coordinated failover to Sentinel
Browse files Browse the repository at this point in the history
Valkey supports the "FAILOVER" command to switch primary and replica
roles in a coordinated fashion. Add a "COORDINATED" option to "SENTINEL
FAILOVER". When given, use "FAILOVER" in the Sentinel forced failover
procedure.
  • Loading branch information
gmbnomis committed Nov 7, 2024
1 parent 1c18c80 commit 146850a
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 10 deletions.
7 changes: 5 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -5548,7 +5548,9 @@ struct COMMAND_ARG SENTINEL_DEBUG_Args[] = {

#ifndef SKIP_CMD_HISTORY_TABLE
/* SENTINEL FAILOVER history */
#define SENTINEL_FAILOVER_History NULL
commandHistory SENTINEL_FAILOVER_History[] = {
{"8.0.0","`COORDINATED` option."},
};
#endif

#ifndef SKIP_CMD_TIPS_TABLE
Expand All @@ -5564,6 +5566,7 @@ struct COMMAND_ARG SENTINEL_DEBUG_Args[] = {
/* SENTINEL FAILOVER argument table */
struct COMMAND_ARG SENTINEL_FAILOVER_Args[] = {
{MAKE_ARG("primary-name",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("coordinated",ARG_TYPE_PURE_TOKEN,-1,"COORDINATED",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL)},
};

/********** SENTINEL FLUSHCONFIG ********************/
Expand Down Expand Up @@ -6026,7 +6029,7 @@ struct COMMAND_STRUCT SENTINEL_Subcommands[] = {
{MAKE_CMD("ckquorum","Checks for a Sentinel quorum.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CKQUORUM_History,0,SENTINEL_CKQUORUM_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CKQUORUM_Keyspecs,0,NULL,1),.args=SENTINEL_CKQUORUM_Args},
{MAKE_CMD("config","Configures Sentinel.","O(N) when N is the number of configuration parameters provided","6.2.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CONFIG_History,1,SENTINEL_CONFIG_Tips,0,sentinelCommand,-4,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CONFIG_Keyspecs,0,NULL,1),.args=SENTINEL_CONFIG_Args},
{MAKE_CMD("debug","Lists or updates the current configurable parameters of Sentinel.","O(N) where N is the number of configurable parameters","7.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_DEBUG_History,0,SENTINEL_DEBUG_Tips,0,sentinelCommand,-2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_DEBUG_Keyspecs,0,NULL,1),.args=SENTINEL_DEBUG_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,0,SENTINEL_FAILOVER_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,1),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,1,SENTINEL_FAILOVER_Tips,0,sentinelCommand,-3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,2),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("flushconfig","Rewrites the Sentinel configuration file.","O(1)","2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FLUSHCONFIG_History,0,SENTINEL_FLUSHCONFIG_Tips,0,sentinelCommand,2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FLUSHCONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("get-master-addr-by-name","Returns the port and address of a primary instance.","O(1)","2.8.4",CMD_DOC_DEPRECATED,"`SENTINEL GET-PRIMARY-ADDR-BY-NAME`","8.0.0","sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_MASTER_ADDR_BY_NAME_History,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_MASTER_ADDR_BY_NAME_Args},
{MAKE_CMD("get-primary-addr-by-name","Returns the port and address of a primary instance.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_History,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Args},
Expand Down
15 changes: 14 additions & 1 deletion src/commands/sentinel-failover.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
"summary": "Forces a Sentinel failover.",
"group": "sentinel",
"since": "2.8.4",
"arity": 3,
"arity": -3,
"container": "SENTINEL",
"function": "sentinelCommand",
"history": [
[
"8.0.0",
"`COORDINATED` option."
]
],
"command_flags": [
"ADMIN",
"SENTINEL",
Expand All @@ -19,6 +25,13 @@
{
"name": "primary-name",
"type": "string"
},
{
"token": "COORDINATED",
"name": "coordinated",
"type": "pure-token",
"optional": true,
"since": "8.0.0"
}
]
}
Expand Down
171 changes: 164 additions & 7 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ typedef struct sentinelAddr {
#define SRI_FORCE_FAILOVER (1 << 11) /* Force failover with primary up. */
#define SRI_SCRIPT_KILL_SENT (1 << 12) /* SCRIPT KILL already sent on -BUSY */
#define SRI_PRIMARY_REBOOT (1 << 13) /* Primary was detected as rebooting */
#define SRI_COORD_FAILOVER (1 << 14) /* Coordinated failover with primary up. */
/* Note: when adding new flags, please check the flags section in addReplySentinelValkeyInstance. */

/* Note: times are in milliseconds. */
Expand Down Expand Up @@ -403,6 +404,7 @@ sentinelValkeyInstance *sentinelSelectReplica(sentinelValkeyInstance *primary);
void sentinelScheduleScriptExecution(char *path, ...);
void sentinelStartFailover(sentinelValkeyInstance *primary);
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
int sentinelKillClients(sentinelValkeyInstance *ri);
int sentinelSendReplicaOf(sentinelValkeyInstance *ri, const sentinelAddr *addr);
char *sentinelVoteLeader(sentinelValkeyInstance *primary, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
int sentinelFlushConfig(void);
Expand Down Expand Up @@ -2586,6 +2588,10 @@ void sentinelRefreshInstanceInfo(sentinelValkeyInstance *ri, const char *info) {
sentinelFlushConfig();
sentinelEvent(LL_WARNING, "+promoted-slave", ri, "%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION) sentinelSimFailureCrash();
if (ri->primary->flags & SRI_COORD_FAILOVER) {
sentinelKillClients(ri->primary);
sentinelKillClients(ri);
}
sentinelEvent(LL_WARNING, "+failover-state-reconf-slaves", ri->primary, "%@");
sentinelCallClientReconfScript(ri->primary, SENTINEL_LEADER, "start", ri->primary->addr, ri->addr);
sentinelForceHelloUpdateForPrimary(ri->primary);
Expand Down Expand Up @@ -3010,7 +3016,10 @@ void sentinelSendPeriodicCommands(sentinelValkeyInstance *ri) {

/* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > sentinel_publish_period) {
sentinelSendHello(ri);
/* Don't publish during coordinated failover while clients may be blocked */
if ((ri->flags & SRI_COORD_FAILOVER) == 0 || ri->failover_state > SENTINEL_FAILOVER_STATE_WAIT_PROMOTION) {
sentinelSendHello(ri);
}
}
}

Expand Down Expand Up @@ -3267,6 +3276,7 @@ void addReplySentinelValkeyInstance(client *c, sentinelValkeyInstance *ri) {
if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags, "reconf_inprog,");
if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags, "reconf_done,");
if (ri->flags & SRI_FORCE_FAILOVER) flags = sdscat(flags, "force_failover,");
if (ri->flags & SRI_COORD_FAILOVER) flags = sdscat(flags,"coordinated_failover,");
if (ri->flags & SRI_SCRIPT_KILL_SENT) flags = sdscat(flags, "script_kill_sent,");
if (ri->flags & SRI_PRIMARY_REBOOT) flags = sdscat(flags, "master_reboot,");

Expand Down Expand Up @@ -3703,7 +3713,7 @@ void sentinelCommand(client *c) {
" Or update current configurable parameters values (one or more).",
"GET-PRIMARY-ADDR-BY-NAME <primary-name>",
" Return the ip and port number of the primary with that name.",
"FAILOVER <primary-name>",
"FAILOVER <primary-name> [COORDINATED]",
" Manually failover a primary node without asking for agreement from other",
" Sentinels",
"FLUSHCONFIG",
Expand Down Expand Up @@ -3839,9 +3849,19 @@ void sentinelCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "failover")) {
/* SENTINEL FAILOVER <primary-name> */
sentinelValkeyInstance *ri;
int coordinated = 0;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetPrimaryByNameOrReplyError(c, c->argv[2])) == NULL) return;
if (c->argc < 3 || c->argc > 4) goto numargserr;
if ((ri = sentinelGetPrimaryByNameOrReplyError(c, c->argv[2])) == NULL)
return;
if (c->argc == 4) {
if (!strcasecmp(c->argv[3]->ptr, "coordinated")) {
coordinated = SRI_COORD_FAILOVER;
} else {
addReplyError(c, "Unknown failover option specified");
return;
}
}
if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
addReplyError(c, "-INPROG Failover already in progress");
return;
Expand All @@ -3852,7 +3872,7 @@ void sentinelCommand(client *c) {
}
serverLog(LL_NOTICE, "Executing user requested FAILOVER of '%s'", ri->name);
sentinelStartFailover(ri);
ri->flags |= SRI_FORCE_FAILOVER;
ri->flags |= SRI_FORCE_FAILOVER | coordinated;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "pending-scripts")) {
/* SENTINEL PENDING-SCRIPTS */
Expand Down Expand Up @@ -4635,6 +4655,109 @@ char *sentinelGetLeader(sentinelValkeyInstance *primary, uint64_t epoch) {
return winner;
}

/* Send FAILOVER to the specified instance using the specified timeout.
* Additionally, issue a "CLIENT PAUSE WRITE" using the same timeout to
* keep clients in blocked state after the failover succeeded, since they
* don't expect to be connected to a replica suddenly (they will be
* disconnected in the next state of the failover)
*
* The command returns C_OK if the FAILOVER command was accepted for
* (later) delivery otherwise C_ERR. The command replies are just
* discarded. */
int sentinelFailoverTo(sentinelValkeyInstance *ri, const sentinelAddr *addr, mstime_t timeout) {
char portstr[32];
const char *host;
int retval;

host = announceSentinelAddr(addr);
ll2string(portstr,sizeof(portstr),addr->port);

/* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s PAUSE %d WRITE",
sentinelInstanceMapCommand(ri,"CLIENT"),
timeout);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s TO %s %s TIMEOUT %d",
sentinelInstanceMapCommand(ri,"FAILOVER"),
host, portstr, timeout);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

/* Kill all existing client connections (because the role of the server switched during
* failover) and unblock new clients. Additionally, send CONFIG REWRITE command
* in order to store the new configuration on disk when possible (that is,
* if the server was started with a configuration file).
*
* The command returns C_OK if the commands were accepted for
* (later) delivery otherwise C_ERR. The command replies are just
* discarded. */
int sentinelKillClients(sentinelValkeyInstance *ri) {
int retval;

/* 1) Rewrite the configuration (the instance just switched roles)
* 2) Disconnect all clients (but this one sending the command) in order
* to trigger the ask-master-on-reconnection protocol for connected
* clients.
* 3) Unblock client writes (which include PUBLISH).
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s UNPAUSE",
sentinelInstanceMapCommand(ri,"CLIENT"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

/* Send REPLICAOF to the specified instance, always followed by a
* CONFIG REWRITE command in order to store the new configuration on disk
* when possible (that is, if the instance is recent enough to support
Expand Down Expand Up @@ -4907,6 +5030,35 @@ void sentinelFailoverSelectReplica(sentinelValkeyInstance *ri) {
}
}

void sentinelFailoverSendFailover(sentinelValkeyInstance *ri) {
int retval;

/* We can't send the command to the master if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
mstime_t time_passed = mstime() - ri->failover_state_change_time;
if (ri->link->disconnected) {
if (time_passed > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-master-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

/* Send FAILOVER command to switch the role of the master and the
* promoted replica. We actually register a generic callback for this
* command as we don't really care about the reply. We check if it worked
* indirectly observing if INFO returns a different role (master instead of
* slave). */
retval = sentinelFailoverTo(ri, ri->promoted_replica->addr, ri->down_after_period);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_replica,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}


void sentinelFailoverSendReplicaOfNoOne(sentinelValkeyInstance *ri) {
int retval;

Expand Down Expand Up @@ -5078,7 +5230,12 @@ void sentinelFailoverStateMachine(sentinelValkeyInstance *ri) {
switch (ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break;
case SENTINEL_FAILOVER_STATE_SELECT_REPLICA: sentinelFailoverSelectReplica(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE: sentinelFailoverSendReplicaOfNoOne(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE:
if (!(ri->flags & SRI_COORD_FAILOVER))
sentinelFailoverSendReplicaOfNoOne(ri);
else
sentinelFailoverSendFailover(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break;
case SENTINEL_FAILOVER_STATE_RECONF_REPLICAS: sentinelFailoverReconfNextReplica(ri); break;
}
Expand All @@ -5093,7 +5250,7 @@ void sentinelAbortFailover(sentinelValkeyInstance *ri) {
serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);

ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER);
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER | SRI_COORD_FAILOVER);
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = mstime();
if (ri->promoted_replica) {
Expand Down
Loading

0 comments on commit 146850a

Please sign in to comment.