Skip to content

Commit

Permalink
Avoid touching client in io thread
Browse files Browse the repository at this point in the history
  • Loading branch information
sundb committed Nov 16, 2024
1 parent f3ac6f9 commit 074cf1c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
8 changes: 8 additions & 0 deletions src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ void handleClientsFromIOThreads(struct aeEventLoop *el, int fd, void *ptr, int m
/* Let main thread to run it. */
c->running_tid = IOTHREAD_MAIN_THREAD_ID;

if (c->read_flags) {
handleClientReadError(c);
if (c->flags & CLIENT_CLOSE_ASAP) continue;
c->running_tid = c->tid;
listAddNodeHead(pending_clients_for_io_threads[t->id], c);
continue;
}

/* The client is asked to close. */
if (isClientClosing(c)) {
freeClient(c);
Expand Down
67 changes: 47 additions & 20 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2686,6 +2686,28 @@ void handleClientReadError(client *c) {
addReplyError(c, "Protocol error: unauthenticated multibulk length");
setProtocolError("unauth mbulk count", c);
break;
case CLIENT_READ_CONN_DISCONNECTED:
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
break;
case CLIENT_READ_CONN_CLOSED:
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
sdsfree(info);
}
freeClientAsync(c);
break;
case CLIENT_READ_REACHED_MAX_QUERYBUF: {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
atomicIncr(server.stat_client_qbuf_limit_disconnections, 1);
break;
}
default:
serverPanic("Unknown client read error");
break;
Expand Down Expand Up @@ -2792,9 +2814,6 @@ int processInputBuffer(client *c) {
c->qb_pos = 0;
}

if (c->running_tid == IOTHREAD_MAIN_THREAD_ID && c->read_flags)
handleClientReadError(c);

/* Update client memory usage after processing the query buffer, this is
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
Expand All @@ -2808,6 +2827,7 @@ void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
c->read_flags = 0;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
Expand Down Expand Up @@ -2886,17 +2906,19 @@ void readQueryFromClient(connection *conn) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
c->read_flags = CLIENT_READ_CONN_DISCONNECTED;
// serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
// freeClientAsync(c);
goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
sdsfree(info);
}
freeClientAsync(c);
c->read_flags = CLIENT_READ_CONN_CLOSED;
// if (server.verbosity <= LL_VERBOSE) {
// sds info = catClientInfoString(sdsempty(), c);
// serverLog(LL_VERBOSE, "Client closed connection %s", info);
// sdsfree(info);
// }
// freeClientAsync(c);
goto done;
}

Expand All @@ -2918,15 +2940,17 @@ void readQueryFromClient(connection *conn) {
*
* For unauthenticated clients, the query buffer cannot exceed 1MB at most. */
(c->mstate.argv_len_sums + sdslen(c->querybuf) > server.client_max_querybuf_len ||
(c->mstate.argv_len_sums + sdslen(c->querybuf) > 1024*1024 && authRequired(c)))) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
atomicIncr(server.stat_client_qbuf_limit_disconnections, 1);
(c->mstate.argv_len_sums + sdslen(c->querybuf) > 1024*1024 && authRequired(c))))
{
c->read_flags = CLIENT_READ_REACHED_MAX_QUERYBUF;
// sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

// bytes = sdscatrepr(bytes,c->querybuf,64);
// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
// sdsfree(ci);
// sdsfree(bytes);
// freeClientAsync(c);
// atomicIncr(server.stat_client_qbuf_limit_disconnections, 1);
goto done;
}

Expand All @@ -2936,6 +2960,9 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && c->running_tid == IOTHREAD_MAIN_THREAD_ID && c->read_flags)
handleClientReadError(c);

if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) {
serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */
resetReusableQueryBuf(c);
Expand Down
5 changes: 5 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_READ_UNAUTH_BUCK_LENGTH 8
#define CLIENT_READ_INVALID_MULTIBUCK_LENGTH 9
#define CLIENT_READ_UNAUTH_MBUCK_COUNT 10
#define CLIENT_READ_CONN_DISCONNECTED 11
#define CLIENT_READ_CONN_CLOSED 12
#define CLIENT_READ_REACHED_MAX_QUERYBUF 13

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down Expand Up @@ -2759,6 +2762,8 @@ void sendPendingClientsToIOThreads(void);
void putInPendingClienstForMainThread(client *c);
void putInPendingClienstForIOThreads(client *c);
void updateIOThreadClientOutputBufferMemoryUsage(client *c);
size_t getIOThreadClientMemoryUsage(client *c, size_t *output_buffer_mem_usage);
void handleClientReadError(client *c);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down

0 comments on commit 074cf1c

Please sign in to comment.