Skip to content

Commit

Permalink
Make repl_provisional_master contain only the necessary client's fields
Browse files Browse the repository at this point in the history
  • Loading branch information
naglera committed Jan 18, 2024
1 parent bab299e commit ac6d324
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 72 deletions.
161 changes: 90 additions & 71 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
#include <sys/stat.h>

void replicationDiscardCachedMaster(void);
void replicationResurrectMaster(connection *conn, client** master);
void replicationResurrectCachedMaster(connection *conn);
void replicationResurrectProvisionalMaster(void);
void replicationSendAck(void);
int replicaPutOnline(client *slave);
void replicaStartCommandStream(client *slave);
int cancelReplicationHandshake(int reconnect);
void syncWithMaster(connection *conn);
void replicationSteadyStateInit(client *master);
void replicationSteadyStateInit(void);
int processEndOffsetResponse(char* response);
void completeRDBChannelSync(connection *conn);
int isReplicaMainChannel(client *c);
Expand Down Expand Up @@ -1838,10 +1839,10 @@ void replicationEmptyDbCallback(dict *d) {
/* Once we have a link with the master and the synchronization was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
client *replicaCreateMasterClient(connection *conn, int dbid) {
client* master_client = createClient(conn);
void replicationCreateMasterClientWithHandler(connection *conn, int dbid, ConnectionCallbackFunc handler) {
server.master = createClient(conn);
if (conn)
connSetReadHandler(master_client->conn, readQueryFromClient);
connSetReadHandler(server.master->conn, handler);

/**
* Important note:
Expand All @@ -1854,21 +1855,23 @@ client *replicaCreateMasterClient(connection *conn, int dbid) {
* to pass the execution to a background thread and unblock after the
* execution is done. This is the reason why we allow blocking the replication
* connection. */
master_client->flags |= CLIENT_MASTER;
server.master->flags |= CLIENT_MASTER;

master_client->authenticated = 1;
master_client->reploff = server.master_initial_offset;
master_client->read_reploff = master_client->reploff;
master_client->user = NULL; /* This client can do everything. */
memcpy(master_client->replid, server.master_replid,
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
server.master->user = NULL; /* This client can do everything. */
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (master_client->reploff == -1)
master_client->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(master_client, dbid);
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master,dbid);
}

return master_client;
void replicationCreateMasterClient(connection *conn, int dbid) {
replicationCreateMasterClientWithHandler(conn, dbid, readQueryFromClient);
}

/* This function will try to re-enable the AOF file after the
Expand Down Expand Up @@ -2366,7 +2369,7 @@ void readSyncBulkPayload(connection *conn) {
* Instead of creating a new client we will use the one created for partial sync */
completeRDBChannelSync(conn);
} else {
server.master = replicaCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db);
replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
}
server.repl_down_since = 0;
Expand All @@ -2390,7 +2393,7 @@ void readSyncBulkPayload(connection *conn) {
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");

if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n");
}
Expand Down Expand Up @@ -2527,19 +2530,24 @@ void freeReplDataBuf(void) {
server.pending_repl_data.len = 0;
}

void clearProvisionalMaster(void) {
server.repl_provisional_master.conn = NULL;
server.repl_provisional_master.reploff = 0;
server.repl_provisional_master.read_reploff = 0;
server.repl_provisional_master.dbid = 0;
}

void abortRdbConnectionSync(int should_retry) {
serverLog(LL_WARNING, "Aborting RDB connection sync");
if (server.repl_transfer_s) {
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
clearProvisionalMaster();
}
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
}
if (server.repl_provisional_master) {
freeClient(server.repl_provisional_master);
server.repl_provisional_master = NULL;
}
if (server.repl_transfer_fd != -1) {
close(server.repl_transfer_fd);
bg_unlink(server.repl_transfer_tmpfile);
Expand All @@ -2563,7 +2571,7 @@ int reInitReplicaMainConnection(connection *conn) {
/* Init the main connection for psync */
conn->state = CONN_STATE_CONNECTED;
server.repl_state = REPL_RDB_CONN_SEND_PSYNC;
serverAssert(connSetReadHandler(server.repl_provisional_master->conn, syncWithMaster) != C_ERR);
serverAssert(connSetReadHandler(server.repl_provisional_master.conn, syncWithMaster) != C_ERR);
syncWithMaster(conn);
return C_OK;
}
Expand Down Expand Up @@ -2603,10 +2611,13 @@ int processEndOffsetResponse(char* response) {
return C_ERR;
}
server.master_initial_offset = reploffset;
server.repl_provisional_master = replicaCreateMasterClient(server.repl_transfer_s, dbid);
memcpy(server.repl_provisional_master->replid, master_replid, CONFIG_RUN_ID_SIZE);
server.repl_provisional_master->reploff = reploffset;
server.repl_provisional_master->read_reploff = reploffset;

/* Initiate repl_provisional_master to act as this replica temp master until RDB is loaded */
server.repl_provisional_master.conn = server.repl_transfer_s;
memcpy(server.repl_provisional_master.replid, master_replid, CONFIG_RUN_ID_SIZE);
server.repl_provisional_master.reploff = reploffset;
server.repl_provisional_master.read_reploff = reploffset;
server.repl_provisional_master.dbid = dbid;
return C_OK;
}

Expand Down Expand Up @@ -2758,7 +2769,6 @@ int isReplicaBufferLimitReached(void) {

/* Read handler for buffering incoming repl data during RDB download/loading. */
void bufferReplData(connection *conn) {
client *c = connGetPrivateData(conn);
size_t readlen = PROTO_IOBUF_LEN;
int read = 0;

Expand Down Expand Up @@ -2805,7 +2815,6 @@ void bufferReplData(connection *conn) {
return;
}
}
c->lastinteraction = server.unixtime;
}

/* Streams accumulated replication data into the database while freeing read nodes */
Expand All @@ -2814,7 +2823,6 @@ void streamReplDataBufToDb(client *c) {
blockingOperationStarts();
size_t offset = 0;
listNode *cur = NULL;

while ((cur = listFirst(server.pending_repl_data.blocks))) {
/* Read and process repl data block */
replDataBufBlock *o = listNodeValue(cur);
Expand All @@ -2824,10 +2832,8 @@ void streamReplDataBufToDb(client *c) {
server.pending_repl_data.len -= o->used;
replStreamProgressCallback(offset, o->used);
offset += o->used;

listDelNode(server.pending_repl_data.blocks, cur);
}

blockingOperationEnds();
}

Expand All @@ -2844,7 +2850,10 @@ void completeRDBChannelSync(connection *conn) {
if (server.repl_state == REPL_RDB_CONN_RECEIVE_PSYNC_REPLY && server.repl_rdb_transfer_s != NULL) {
/* RDB is still loading */
server.repl_state = REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE;
replicationSteadyStateInit(server.repl_provisional_master);
if (connSetReadHandler(server.repl_provisional_master.conn, bufferReplData)) {
serverLog(LL_WARNING,"Error while setting readable handler: %s", strerror(errno));
abortRdbConnectionSync(1);
}
replDataBufInit();
return;
}
Expand All @@ -2863,19 +2872,19 @@ void completeRDBChannelSync(connection *conn) {
if (server.repl_state == REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE) {
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
connSetReadHandler(server.repl_transfer_s, NULL);
streamReplDataBufToDb(server.repl_provisional_master);
freeReplDataBuf();
serverLog(LL_NOTICE, "Successfully streamed replication data into memory");
goto sync_success;
}
serverPanic("Unrecognized replication state %d using rdb connection", server.repl_state);
}
serverPanic("Unknown connection %d", conn->fd);

sync_success:
removeFromServerCientList(server.repl_provisional_master);
replicationResurrectMaster(server.repl_transfer_s, &server.repl_provisional_master);
replicationSteadyStateInit(server.master);
replicationResurrectProvisionalMaster();
streamReplDataBufToDb(server.master);
freeReplDataBuf();
serverLog(LL_NOTICE, "Successfully streamed replication data into memory");
/* We can resume reading from the master connection once the local replication buffer has been loaded. */
replicationSteadyStateInit();
replicationSendAck(); /* Send ACK to notify primary that replica is synced */
}

Expand Down Expand Up @@ -2949,8 +2958,8 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {


if (isOngoingRdbChannelSync()) {
psync_replid = server.repl_provisional_master->replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_master->reploff+1);
psync_replid = server.repl_provisional_master.replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_master.reploff+1);
serverLog(LL_NOTICE, "Trying a partial resynchronization using main channel (request %s:%s).", psync_replid, psync_offset);
} else if (server.cached_master) {
psync_replid = server.cached_master->replid;
Expand Down Expand Up @@ -3055,8 +3064,8 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
/* In case we are using rdb connection sync, the master is already initialized by now
* In this case we will just verify his id. Notice that in this case cached master is not
* relevant because it wasn't used */
serverAssert(strcmp(new, server.repl_provisional_master->replid) == 0);
server.repl_provisional_master->read_reploff = server.repl_provisional_master->reploff;
serverAssert(strcmp(new, server.repl_provisional_master.replid) == 0);
server.repl_provisional_master.read_reploff = server.repl_provisional_master.reploff;
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
} else if (strcmp(new, server.cached_master->replid)) {
Expand All @@ -3083,8 +3092,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
if (isOngoingRdbChannelSync()) {
completeRDBChannelSync(conn);
} else {
replicationResurrectMaster(conn, &server.cached_master);
replicationSteadyStateInit(server.master);
replicationResurrectCachedMaster(conn);
}

/* If this instance was restarted and we read the metadata to
Expand Down Expand Up @@ -3880,7 +3888,7 @@ void replicationSendAck(void) {
* replicationDiscardCachedMaster() that will make sure to kill the client
* as for some reason we don't want to use it in the future.
*
* replicationResurrectMaster() that is used after a successful PSYNC
* replicationResurrectCachedMaster() that is used after a successful PSYNC
* handshake in order to reactivate the cached master.
*/
void replicationCacheMaster(client *c) {
Expand Down Expand Up @@ -3942,14 +3950,14 @@ void replicationCacheMasterUsingMyself(void) {
"the new master with just a partial transfer.");

/* This will be used to populate the field server.master->reploff
* by replicaCreateMasterClient(). We'll later set the created
* by replicationCreateMasterClient(). We'll later set the created
* master as server.cached_master, so the replica will use such
* offset for PSYNC. */
server.master_initial_offset = server.master_repl_offset;

/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
server.master = replicaCreateMasterClient(NULL,-1);
replicationCreateMasterClient(NULL,-1);

/* Use our own ID / offset. */
memcpy(server.master->replid, server.replid, sizeof(server.replid));
Expand All @@ -3971,16 +3979,9 @@ void replicationDiscardCachedMaster(void) {
server.cached_master = NULL;
}

/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This function is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from where this
* master left. */
void replicationResurrectMaster(connection *conn, client** master) {
server.master = *master;
*master = NULL;
server.master->conn = conn;
/* This method performs the necessary steps to establish a connection with the master server.
* It sets private data, updates flags, and fires an event to notify modules about the master link change. */
void establishMasterConnection(void) {
connSetPrivateData(server.master->conn, server.master);
server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
server.master->authenticated = 1;
Expand All @@ -3992,36 +3993,54 @@ void replicationResurrectMaster(connection *conn, client** master) {
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
}

/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This function is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from where this
* master left. */
void replicationResurrectCachedMaster(connection *conn) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->conn = conn;

establishMasterConnection();
/* Re-add to the list of clients. */
linkClient(server.master);
replicationSteadyStateInit();
}

/* Prepare replica to steady state.
* prerequisite: server.master is already initialized */
void replicationSteadyStateInit(client *master) {
int retval;
if (server.repl_state == REPL_RDB_CONN_TWO_CONNECTIONS_ACTIVE) {
retval = connSetReadHandler(master->conn, bufferReplData);
} else {
retval = connSetReadHandler(master->conn, readQueryFromClient);
}

if (retval) {
* prerequisite: server.master is already initialized and linked in client list. */
void replicationSteadyStateInit(void) {
if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(master); /* Close ASAP. */
freeClientAsync(server.master); /* Close ASAP. */
}

/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (clientHasPendingReplies(master)) {
if (connSetWriteHandler(master->conn, sendReplyToClient)) {
if (clientHasPendingReplies(server.master)) {
if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(master); /* Close ASAP. */
freeClientAsync(server.master); /* Close ASAP. */
}
}
}


void replicationResurrectProvisionalMaster(void) {
/* Create a master client, but do not initialize the read handler yet, as this replica still has a local buffer to drain. */
replicationCreateMasterClientWithHandler(server.repl_transfer_s, server.repl_provisional_master.dbid, NULL);
memcpy(server.master->replid, server.repl_provisional_master.replid, CONFIG_RUN_ID_SIZE);
server.master->reploff = server.repl_provisional_master.reploff;
server.master->read_reploff = server.repl_provisional_master.read_reploff;

establishMasterConnection();
}

/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */

/* This function counts the number of slaves with lag <= min-slaves-max-lag.
Expand Down
8 changes: 7 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,13 @@ struct redisServer {
int masterport; /* Port of master */
int repl_timeout; /* Timeout after N seconds of master idle */
client *master; /* Client that is master for this slave */
client *repl_provisional_master; /* Client used for psync during rdb load */
struct {
connection* conn;
char replid[CONFIG_RUN_ID_SIZE+1];
long long reploff;
long long read_reploff;
int dbid;
} repl_provisional_master;
client *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
Expand Down

0 comments on commit ac6d324

Please sign in to comment.