diff --git a/src/networking.c b/src/networking.c index 7054ffc126..340d30a3dc 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds thread_shared_qb = NULL; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -147,7 +148,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -1608,7 +1609,11 @@ void freeClient(client *c) { } /* Free the query buffer */ - sdsfree(c->querybuf); + if (c->querybuf && c->querybuf == thread_shared_qb) { + sdsclear(c->querybuf); + } else { + sdsfree(c->querybuf); + } c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2093,6 +2098,48 @@ void resetClient(client *c) { } } +/* Initializes the shared query buffer to a new sds with the default capacity */ +void initSharedQueryBuf(void) { + thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_shared_qb); +} + +/* Resets the shared query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the shared buffer. */ +void resetSharedQueryBuf(client *c) { + serverAssert(c->querybuf == thread_shared_qb); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + + if (remaining > 0) { + /* Let the client take ownership of the shared buffer. */ + initSharedQueryBuf(); + return; + } + + c->querybuf = NULL; + sdsclear(thread_shared_qb); + c->qb_pos = 0; +} + +/* Trims the client query buffer to the current position. */ +void trimClientQueryBuffer(client *c) { + if (c->querybuf == thread_shared_qb) { + resetSharedQueryBuf(c); + } + + if (c->querybuf == NULL) { + return; + } + + serverAssert(c->qb_pos <= sdslen(c->querybuf)); + + if (c->qb_pos > 0) { + sdsrange(c->querybuf, c->qb_pos, -1); + c->qb_pos = 0; + } +} + /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2348,6 +2395,10 @@ int processMultibulkBuffer(client *c) { * ll+2, trimming querybuf is just a waste of time, because * at this time the querybuf contains not only our bulk. */ if (sdslen(c->querybuf) - c->qb_pos <= (size_t)ll + 2) { + if (c->querybuf == thread_shared_qb) { + /* Let the client take the ownership of the shared buffer. */ + initSharedQueryBuf(); + } sdsrange(c->querybuf, c->qb_pos, -1); c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is @@ -2508,7 +2559,7 @@ int processPendingCommandAndInputBuffer(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while (c->qb_pos < sdslen(c->querybuf)) { + while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2559,6 +2610,13 @@ int processInputBuffer(client *c) { break; } + if (c->querybuf == thread_shared_qb) { + /* Before processing the command, reset the shared query buffer to its default state. + * This avoids unintentionally modifying the shared qb during processCommand as we may use + * the shared qb for other clients during processEventsWhileBlocked */ + resetSharedQueryBuf(c); + } + /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this @@ -2587,10 +2645,8 @@ int processInputBuffer(client *c) { c->qb_pos -= c->repl_applied; c->repl_applied = 0; } - } else if (c->qb_pos) { - /* Trim to pos */ - sdsrange(c->querybuf, c->qb_pos, -1); - c->qb_pos = 0; + } else { + trimClientQueryBuffer(c); } /* Update client memory usage after processing the query buffer, this is @@ -2614,14 +2670,16 @@ void readQueryFromClient(connection *conn) { atomic_fetch_add_explicit(&server.stat_total_reads_processed,1,memory_order_relaxed); readlen = PROTO_IOBUF_LEN; + qblen = c->querybuf ? sdslen(c->querybuf) : 0; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * robj representing the argument. */ + if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen + 2) - (sdslen(c->querybuf) - c->qb_pos); + ssize_t remaining = (size_t)(c->bulklen + 2) - (qblen - c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, @@ -2633,7 +2691,12 @@ void readQueryFromClient(connection *conn) { if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } - qblen = sdslen(c->querybuf); + if (c->querybuf == NULL) { + serverAssert(sdslen(thread_shared_qb) == 0); + c->querybuf = big_arg ? sdsempty() : thread_shared_qb; + qblen = sdslen(c->querybuf); + } + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg @@ -2654,7 +2717,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf + qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s", connGetLastError(c->conn)); freeClientAsync(c); @@ -2707,6 +2770,10 @@ void readQueryFromClient(connection *conn) { if (processInputBuffer(c) == C_ERR) c = NULL; done: + if (c && c->querybuf == thread_shared_qb) { + sdsclear(thread_shared_qb); + c->querybuf = NULL; + } beforeNextClient(c); } @@ -2824,8 +2891,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3780,8 +3847,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) { * the client output buffer memory usage portion of the total. */ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory @@ -4168,6 +4236,7 @@ void *IOThreadMain(void *myid) { valkey_set_thread_title(thdname); serverSetCpuAffinity(server.server_cpulist); makeThreadKillable(); + initSharedQueryBuf(); while (1) { /* Wait for start */ diff --git a/src/replication.c b/src/replication.c index 069d60a678..909ac79561 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1722,6 +1722,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the shared query buffer. + * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/server.c b/src/server.c index edf215eac2..1f6ac2a1bd 100644 --- a/src/server.c +++ b/src/server.c @@ -714,6 +714,8 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */ + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -723,7 +725,18 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. + * We don't move the client to the shared query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size / 2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -739,7 +752,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -807,7 +820,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + (c->argv ? zmalloc_size(c->argv) : 0); + size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; + size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -2713,6 +2728,7 @@ void initServer(void) { } slowlogInit(); latencyMonitorInit(); + initSharedQueryBuf(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); @@ -6311,7 +6327,7 @@ void dismissMemory(void *ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum / c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index 7011be3033..82b427c1f1 100644 --- a/src/server.h +++ b/src/server.h @@ -2718,6 +2718,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); +void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 92bacaa071..9566d59784 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} proc get_field_in_client_info {info field} { set info [string trim $info] diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 0394b72c00..66942a5bd1 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -24,8 +24,24 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [valkey_client] + + set rd [valkey_deferring_client] + $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. + assert {[client_query_buffer test_client] == 0} + + # Send partial command to client to make sure it doesn't use the shared qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + after 100 + # send the rest of the command + $rd write "a\r\n\$1\r\nb\r\n" + $rd flush + assert_equal {OK} [$rd read] + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k @@ -78,6 +94,11 @@ start_server {tags {"querybuf slow"}} { $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + after 200 + # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + after 20 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s"