Skip to content

Commit

Permalink
refator read/write handler transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
ShooterIT committed Nov 17, 2024
1 parent 7643fd0 commit 895303d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 33 deletions.
82 changes: 55 additions & 27 deletions src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ void putInPendingClienstForMainThread(client *c) {
if (c->io_thread_client_list_node) {
listDelNode(io_threads[c->tid].clients, c->io_thread_client_list_node);
c->io_thread_client_list_node = NULL;
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
c->read_enabled = 0;
c->write_enabled = 0;
listAddNodeTail(io_threads[c->tid].pending_clients_for_main_thread, c);
}
}

void putInPendingClienstForIOThreads(client *c) {
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
c->read_enabled = 0;
c->write_enabled = 0;
listAddNodeTail(pending_clients_for_io_threads[c->tid], c);
}

Expand Down Expand Up @@ -161,14 +163,16 @@ int ioThreadCron(struct aeEventLoop *eventLoop, long long id, void *ptr) {
// serverLog(LL_DEBUG, "io thead %ld, event loop size: %d", t->id, aeGetSetSize(t->el));

/* Clients cron in io thread. */
int iterations = IO_THREAD_CRON_CLIENTS_ITERATIONS;
int iterations = max(IO_THREAD_CRON_CLIENTS_ITERATIONS, listLength(t->clients)/10);
iterations = min(iterations, (int)listLength(t->clients));
while (listLength(t->clients) && iterations--) {
listNode *head = listFirst(t->clients);
client *c = listNodeValue(head);
listRotateHeadToTail(t->clients);

serverAssert(c->tid == t->id);
serverAssert(c->conn->write_handler || c->conn->read_handler);
serverAssert(c->running_tid == t->id);
serverAssert(connHasReadHandler(c->conn));

/* The client is asked to close, let main thread to free finally. */
if (isClientClosing(c)) {
Expand Down Expand Up @@ -219,13 +223,21 @@ void handleClientsFromIOThreads(struct aeEventLoop *el, int fd, void *ptr, int m
listRewind(clients, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
serverAssert(!connHasReadHandler(c->conn) && !connHasWriteHandler(c->conn));
/* Make sure the client is readable or writable in io thread to
* avoid data race. */
serverAssert(!c->read_enabled && !c->write_enabled);

/* Let main thread to run it. */
c->running_tid = IOTHREAD_MAIN_THREAD_ID;

/* The client is asked to close. */
if (isClientClosing(c)) {
if (connHasReadHandler(c->conn) || connHasWriteHandler(c->conn)) {
pauseIOThread(c->tid);
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
resumeIOThread(c->tid);
}
freeClient(c);
continue;
}
Expand All @@ -239,18 +251,21 @@ void handleClientsFromIOThreads(struct aeEventLoop *el, int fd, void *ptr, int m
* to the next. */
continue;
}
/* The main thread will free this client. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not put the client in pending write queue (it can't).
* And NOTE that io thread may not finish reply. */
* And NOTE that io thread may not finish writing reply to client. */
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
putClientInPendingWriteQueue(c);

/* If the client only can be processed in the main thread, otherwise,
* there will be data race. */
if (getClientType(c) != CLIENT_TYPE_NORMAL ||
/* The client only can be processed in the main thread, otherwise data
* race will happen, since we may touch client's data in main thread. */
if (c->flags & CLIENT_SLAVE ||
c->flags & CLIENT_PUBSUB ||
c->flags & CLIENT_CLOSE_ASAP ||
c->flags & CLIENT_MONITOR ||
c->flags & CLIENT_BLOCKED ||
c->flags & CLIENT_UNBLOCKED ||
c->flags & CLIENT_MULTI ||
c->flags & CLIENT_LUA_DEBUG ||
c->flags & CLIENT_LUA_DEBUG_SYNC ||
Expand All @@ -263,24 +278,35 @@ void handleClientsFromIOThreads(struct aeEventLoop *el, int fd, void *ptr, int m
c->lastcmd->proc == flushdbCommand ||
c->lastcmd->proc == sflushCommand)))
{
/* Main thread owns the client, rebind the event loop,
* and set the read/write handler */
/* Remove the client from io thread event loop. */
pauseIOThread(c->tid);
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
resumeIOThread(c->tid);
/* Let main thread to run it, rebind event loop and read handler */
connRebindEventLoop(c->conn, server.el);
connSetReadHandler(c->conn, readQueryFromClient);
if (!(c->flags & CLIENT_PROTECTED))
connSetReadHandler(c->conn, readQueryFromClient);
c->read_enabled = 1;
c->write_enabled = 1;
c->tid = IOTHREAD_MAIN_THREAD_ID;
continue;
}

/* If the client is still valid, let io threads handle its writing. */
if (c->flags & CLIENT_PENDING_WRITE) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
if (c->flags & CLIENT_PENDING_WRITE || c->flags & (CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT)) {
if (c->flags & CLIENT_PENDING_WRITE) {
c->flags &= ~CLIENT_PENDING_WRITE;
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
}
c->running_tid = c->tid;
listAddNodeTail(pending_clients_for_io_threads[t->id], c);
listAddNodeTail(pending_clients_for_io_threads[c->tid], c);
continue;
}

/* TODO: remaining clients are handled by main thread, what's the client status?
* it should not reach here? */
serverPanic("Unknown client status");
}
listRelease(clients);

Expand Down Expand Up @@ -320,7 +346,7 @@ void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int
listRewind(clients, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
serverAssert(!connHasReadHandler(c->conn) && !connHasWriteHandler(c->conn));
serverAssert(!c->read_enabled && !c->write_enabled);
/* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since
* we only set 'closing' state when clients in io thread are freed ASAP. */
serverAssert(!(c->flags & CLIENT_CLOSE_ASAP));
Expand All @@ -334,16 +360,18 @@ void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int
/* IO threads start to manage this client */
listAddNodeTail(t->clients, c);
c->io_thread_client_list_node = listLast(t->clients);
c->read_enabled = 1;
c->write_enabled = 1;

/* Only bind once, we never remove read handler unless freeing client. */
if (!connHasReadHandler(c->conn)) {
/* We should install read handler first since writeToClient may free client,
* otherwise we will rebind read handler after freeing client. */
connRebindEventLoop(c->conn, t->el);
connSetReadHandler(c->conn, readQueryFromClient);
}

/* TODO: Need to improve, do it only if needed */
connRebindEventLoop(c->conn, t->el);

/* We should install read handler first since writeToClient may free client,
* otherwise we will rebind read handler after freeing client. */
connSetReadHandler(c->conn, readQueryFromClient);

if (c->flags & CLIENT_PENDING_WRITE) {
c->flags &= ~CLIENT_PENDING_WRITE;
if (clientHasPendingReplies(c)) {
writeToClient(c, 0);
if (!isClientClosing(c) && clientHasPendingReplies(c)) {
connSetWriteHandler(c->conn, sendReplyToClient);
Expand Down
20 changes: 15 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ client *createClient(connection *conn) {
listInitNode(&c->clients_pending_write_node, c);
c->mem_usage_bucket = NULL;
c->mem_usage_bucket_node = NULL;
c->read_enabled = 1;
c->write_enabled = 1;
if (conn) linkClient(c);
initClientMultiState(c);
return c;
Expand Down Expand Up @@ -1437,6 +1439,7 @@ void acceptCommonHandler(connection *conn, int flags, char *ip) {
/* Select io thread */
c->tid = c->id % (server.io_threads_num-1) + 1;
c->running_tid = c->tid;
serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID);
/* Let the specific io thread to handle */
putInPendingClienstForIOThreads(c);
}
Expand Down Expand Up @@ -1832,6 +1835,9 @@ void freeClientAsync(client *c) {
* TODO:is it safe? */
connShutdown(c->conn);
} else {
/* Remove read and write handler from io thread event loop. */
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
putInPendingClienstForMainThread(c);
}
return;
Expand Down Expand Up @@ -2060,6 +2066,7 @@ int _writeToClient(client *c, ssize_t *nwritten) {
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(client *c, int handler_installed) {
if (c->write_enabled == 0) return C_OK;
/* Update total number of writes on server */
atomicIncr(server.stat_total_writes_processed, 1);
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
Expand Down Expand Up @@ -2234,7 +2241,7 @@ void resetClient(client *c) {
* path, it is not really released, but only marked for later release. */
void protectClient(client *c) {
c->flags |= CLIENT_PROTECTED;
if (c->conn) {
if (c->conn && c->read_enabled && c->write_enabled) {
connSetReadHandler(c->conn,NULL);
connSetWriteHandler(c->conn,NULL);
}
Expand All @@ -2245,7 +2252,8 @@ void unprotectClient(client *c) {
if (c->flags & CLIENT_PROTECTED) {
c->flags &= ~CLIENT_PROTECTED;
if (c->conn) {
connSetReadHandler(c->conn,readQueryFromClient);
if (c->read_enabled && c->write_enabled)
connSetReadHandler(c->conn,readQueryFromClient);
if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
}
}
Expand Down Expand Up @@ -2751,6 +2759,7 @@ void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
if (c->read_enabled == 0) return;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
Expand Down Expand Up @@ -2948,7 +2957,7 @@ sds catClientInfoString(sds s, client *client) {
!server.crashing)
{
paused = 1;
pauseIOThread(client->tid);
pauseIOThread(client->running_tid);
}

p = flags;
Expand Down Expand Up @@ -3024,9 +3033,10 @@ sds catClientInfoString(sds s, client *client) {
" redir=%I", (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
" resp=%i", client->resp,
" lib-name=%s", client->lib_name ? (char*)client->lib_name->ptr : "",
" lib-ver=%s", client->lib_ver ? (char*)client->lib_ver->ptr : ""));
" lib-ver=%s", client->lib_ver ? (char*)client->lib_ver->ptr : "",
" io-thread=%i", client->running_tid));

if (paused) resumeIOThread(client->tid);
if (paused) resumeIOThread(client->running_tid);
return ret;
}

Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */

/* Main thread id when enabling io thread. */
#define IOTHREAD_MAIN_THREAD_ID -1
#define IOTHREAD_MAIN_THREAD_ID 0

/* Command flags. Please check the definition of struct redisCommand in this file
* for more information about the meaning of every flag. */
Expand Down Expand Up @@ -1297,6 +1297,8 @@ typedef struct client {
#endif
redisAtomic size_t output_buffer_len;
redisAtomic size_t output_buffer_mem;
int read_enabled; /* Client can read from socket. */
int write_enabled; /* Client can write to socket. */
} client;

typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) {
Expand Down

0 comments on commit 895303d

Please sign in to comment.