From 074cf1c4cde3bae42c01e95c5f0acc534293c16c Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Sat, 16 Nov 2024 22:16:49 +0800 Subject: [PATCH] Avoid touching client in io thread --- src/iothread.c | 8 ++++++ src/networking.c | 67 +++++++++++++++++++++++++++++++++--------------- src/server.h | 5 ++++ 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/src/iothread.c b/src/iothread.c index e4eeb6265ae..3b94a21be93 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -224,6 +224,14 @@ void handleClientsFromIOThreads(struct aeEventLoop *el, int fd, void *ptr, int m /* Let main thread to run it. */ c->running_tid = IOTHREAD_MAIN_THREAD_ID; + if (c->read_flags) { + handleClientReadError(c); + if (c->flags & CLIENT_CLOSE_ASAP) continue; + c->running_tid = c->tid; + listAddNodeHead(pending_clients_for_io_threads[t->id], c); + continue; + } + /* The client is asked to close. */ if (isClientClosing(c)) { freeClient(c); diff --git a/src/networking.c b/src/networking.c index 273c4fb6ebb..6cfbaa48961 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2686,6 +2686,28 @@ void handleClientReadError(client *c) { addReplyError(c, "Protocol error: unauthenticated multibulk length"); setProtocolError("unauth mbulk count", c); break; + case CLIENT_READ_CONN_DISCONNECTED: + serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); + freeClientAsync(c); + break; + case CLIENT_READ_CONN_CLOSED: + if (server.verbosity <= LL_VERBOSE) { + sds info = catClientInfoString(sdsempty(), c); + serverLog(LL_VERBOSE, "Client closed connection %s", info); + sdsfree(info); + } + freeClientAsync(c); + break; + case CLIENT_READ_REACHED_MAX_QUERYBUF: { + sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); + bytes = sdscatrepr(bytes,c->querybuf,64); + serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); + sdsfree(ci); + sdsfree(bytes); + freeClientAsync(c); + atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); + break; + } default: serverPanic("Unknown client read error"); break; @@ -2792,9 +2814,6 @@ int processInputBuffer(client *c) { c->qb_pos = 0; } - if (c->running_tid == IOTHREAD_MAIN_THREAD_ID && c->read_flags) - handleClientReadError(c); - /* Update client memory usage after processing the query buffer, this is * important in case the query buffer is big and wasn't drained during * the above loop (because of partially sent big commands). */ @@ -2808,6 +2827,7 @@ void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, big_arg = 0; size_t qblen, readlen; + c->read_flags = 0; /* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1); @@ -2886,17 +2906,19 @@ void readQueryFromClient(connection *conn) { if (connGetState(conn) == CONN_STATE_CONNECTED) { goto done; } else { - serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); - freeClientAsync(c); + c->read_flags = CLIENT_READ_CONN_DISCONNECTED; + // serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); + // freeClientAsync(c); goto done; } } else if (nread == 0) { - if (server.verbosity <= LL_VERBOSE) { - sds info = catClientInfoString(sdsempty(), c); - serverLog(LL_VERBOSE, "Client closed connection %s", info); - sdsfree(info); - } - freeClientAsync(c); + c->read_flags = CLIENT_READ_CONN_CLOSED; + // if (server.verbosity <= LL_VERBOSE) { + // sds info = catClientInfoString(sdsempty(), c); + // serverLog(LL_VERBOSE, "Client closed connection %s", info); + // sdsfree(info); + // } + // freeClientAsync(c); goto done; } @@ -2918,15 +2940,17 @@ void readQueryFromClient(connection *conn) { * * For unauthenticated clients, the query buffer cannot exceed 1MB at most. */ (c->mstate.argv_len_sums + sdslen(c->querybuf) > server.client_max_querybuf_len || - (c->mstate.argv_len_sums + sdslen(c->querybuf) > 1024*1024 && authRequired(c)))) { - sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); - - bytes = sdscatrepr(bytes,c->querybuf,64); - serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); - sdsfree(ci); - sdsfree(bytes); - freeClientAsync(c); - atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); + (c->mstate.argv_len_sums + sdslen(c->querybuf) > 1024*1024 && authRequired(c)))) + { + c->read_flags = CLIENT_READ_REACHED_MAX_QUERYBUF; + // sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); + + // bytes = sdscatrepr(bytes,c->querybuf,64); + // serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); + // sdsfree(ci); + // sdsfree(bytes); + // freeClientAsync(c); + // atomicIncr(server.stat_client_qbuf_limit_disconnections, 1); goto done; } @@ -2936,6 +2960,9 @@ void readQueryFromClient(connection *conn) { c = NULL; done: + if (c && c->running_tid == IOTHREAD_MAIN_THREAD_ID && c->read_flags) + handleClientReadError(c); + if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) { serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ resetReusableQueryBuf(c); diff --git a/src/server.h b/src/server.h index 7131d23962c..c72936fc827 100644 --- a/src/server.h +++ b/src/server.h @@ -405,6 +405,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_READ_UNAUTH_BUCK_LENGTH 8 #define CLIENT_READ_INVALID_MULTIBUCK_LENGTH 9 #define CLIENT_READ_UNAUTH_MBUCK_COUNT 10 +#define CLIENT_READ_CONN_DISCONNECTED 11 +#define CLIENT_READ_CONN_CLOSED 12 +#define CLIENT_READ_REACHED_MAX_QUERYBUF 13 /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -2759,6 +2762,8 @@ void sendPendingClientsToIOThreads(void); void putInPendingClienstForMainThread(client *c); void putInPendingClienstForIOThreads(client *c); void updateIOThreadClientOutputBufferMemoryUsage(client *c); +size_t getIOThreadClientMemoryUsage(client *c, size_t *output_buffer_mem_usage); +void handleClientReadError(client *c); /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf);