Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce shared query buffer for client reads #258

Merged
merged 11 commits into from
May 28, 2024
93 changes: 80 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds thread_shared_qb = NULL;

/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -152,7 +153,7 @@ client *createClient(connection *conn) {
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
Expand Down Expand Up @@ -1616,7 +1617,11 @@ void freeClient(client *c) {
}

/* Free the query buffer */
sdsfree(c->querybuf);
if (c->querybuf == thread_shared_qb) {
sdsclear(c->querybuf);
} else {
sdsfree(c->querybuf);
}
c->querybuf = NULL;

/* Deallocate structures used to block on blocking ops. */
Expand Down Expand Up @@ -2116,6 +2121,48 @@ void resetClient(client *c) {
}
}

/* Initializes the shared query buffer to a new sds with the default capacity */
void initSharedQueryBuf(void) {
thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(thread_shared_qb);
}

/* Resets the shared query buffer used by the given client.
* If any data remained in the buffer, the client will take ownership of the buffer
madolson marked this conversation as resolved.
Show resolved Hide resolved
* and a new empty buffer will be allocated for the shared buffer. */
void resetSharedQueryBuf(client *c) {
if (c->querybuf != thread_shared_qb) return;
uriyage marked this conversation as resolved.
Show resolved Hide resolved
size_t remaining = sdslen(c->querybuf) - c->qb_pos;

if (remaining > 0) {
/* Let the client take ownership of the shared buffer. */
initSharedQueryBuf();
return;
}

c->querybuf = NULL;
sdsclear(thread_shared_qb);
c->qb_pos = 0;
}

/* Trims the client query buffer to the current position. */
void trimClientQueryBuffer(client *c) {
if (c->querybuf == thread_shared_qb) {
resetSharedQueryBuf(c);
}

if (c->querybuf == NULL) {
return;
}

serverAssert(c->qb_pos <= sdslen(c->querybuf));

if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
}
}

/* This function is used when we want to re-enter the event loop but there
* is the risk that the client we are dealing with will be freed in some
* way. This happens for instance in:
Expand Down Expand Up @@ -2378,6 +2425,10 @@ int processMultibulkBuffer(client *c) {
* ll+2, trimming querybuf is just a waste of time, because
* at this time the querybuf contains not only our bulk. */
if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
if (c->querybuf == thread_shared_qb) {
/* Let the client take the ownership of the shared buffer. */
initSharedQueryBuf();
}
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
/* Hint the sds library about the amount of bytes this string is
Expand Down Expand Up @@ -2542,7 +2593,7 @@ int processPendingCommandAndInputBuffer(client *c) {
* return C_ERR in case the client was freed during the processing */
int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
while(c->querybuf && c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

Expand Down Expand Up @@ -2593,6 +2644,13 @@ int processInputBuffer(client *c) {
break;
}

if (c->querybuf == thread_shared_qb) {
/* Before processing the command, reset the shared query buffer to its default state.
* This avoids unintentionally modifying the shared qb during processCommand as we may use
madolson marked this conversation as resolved.
Show resolved Hide resolved
* the shared qb for other clients during processEventsWhileBlocked */
resetSharedQueryBuf(c);
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
Expand Down Expand Up @@ -2621,10 +2679,8 @@ int processInputBuffer(client *c) {
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
} else {
trimClientQueryBuffer(c);
}

/* Update client memory usage after processing the query buffer, this is
Expand All @@ -2649,6 +2705,7 @@ void readQueryFromClient(connection *conn) {
atomicIncr(server.stat_total_reads_processed, 1);

readlen = PROTO_IOBUF_LEN;
qblen = c->querybuf ? sdslen(c->querybuf) : 0;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
Expand All @@ -2658,7 +2715,7 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
ssize_t remaining = (size_t)(c->bulklen+2)-(qblen-c->qb_pos);
big_arg = 1;

/* Note that the 'remaining' variable may be zero in some edge case,
Expand All @@ -2671,7 +2728,12 @@ void readQueryFromClient(connection *conn) {
readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (c->querybuf == NULL) {
serverAssert(sdslen(thread_shared_qb) == 0);
c->querybuf = big_arg ? sdsempty() : thread_shared_qb;
qblen = sdslen(c->querybuf);
}

if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
Expand All @@ -2692,7 +2754,7 @@ void readQueryFromClient(connection *conn) {
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2744,6 +2806,10 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && c->querybuf == thread_shared_qb) {
sdsclear(thread_shared_qb);
c->querybuf = NULL;
}
beforeNextClient(c);
}

Expand Down Expand Up @@ -2859,8 +2925,8 @@ sds catClientInfoString(sds s, client *client) {
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" rbs=%U", (unsigned long long) client->buf_usable_size,
Expand Down Expand Up @@ -3831,7 +3897,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
size_t mem = getClientOutputBufferMemoryUsage(c);
if (output_buffer_mem_usage != NULL)
*output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
mem += zmalloc_size(c);
mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
Expand Down Expand Up @@ -4228,6 +4294,7 @@ void *IOThreadMain(void *myid) {
redis_set_thread_title(thdname);
serverSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
initSharedQueryBuf();

while(1) {
/* Wait for start */
Expand Down
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
* connection. */
server.master->flags |= CLIENT_MASTER;

/* Allocate a private query buffer for the master client instead of using the shared query buffer.
* This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */
server.master->querybuf = sdsempty();
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
Expand Down
22 changes: 18 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
/* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
uriyage marked this conversation as resolved.
Show resolved Hide resolved
time_t idletime = server.unixtime - c->lastinteraction;

Expand All @@ -744,7 +746,18 @@ int clientsCronResizeQueryBuffer(client *c) {
/* There are two conditions to resize the query buffer: */
if (idletime > 2) {
/* 1) Query is idle for a long time. */
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_MASTER) && !remaining) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
/* If the client is not a master and no data is pending,
madolson marked this conversation as resolved.
Show resolved Hide resolved
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
/* By setting the querybuf to NULL, the client will use the shared query buffer in the next read.
* We don't move the client to the shared query buffer immediately, because if we allocated a private
* query buffer for the client, it's likely that the client will use it again soon. */
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
}
} else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) {
/* 2) Query buffer is too big for latest peak and is larger than
* resize threshold. Trim excess space but only up to a limit,
Expand All @@ -760,7 +773,7 @@ int clientsCronResizeQueryBuffer(client *c) {

/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = sdslen(c->querybuf);
c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0;
/* We reset to either the current used, or currently processed bulk size,
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2;
Expand Down Expand Up @@ -835,7 +848,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum +
size_t in_usage = (c->querybuf ? sdsZmallocSize(c->querybuf): 0) + c->argv_len_sum +
(c->argv ? zmalloc_size(c->argv) : 0);
size_t out_usage = getClientOutputBufferMemoryUsage(c);

Expand Down Expand Up @@ -2782,6 +2795,7 @@ void initServer(void) {
}
slowlogInit();
latencyMonitorInit();
initSharedQueryBuf();

/* Initialize ACL default password if it exists */
ACLUpdateDefaultUserPassword(server.requirepass);
Expand Down Expand Up @@ -6553,7 +6567,7 @@ void dismissMemory(void* ptr, size_t size_hint) {
void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
if (c->querybuf) dismissSds(c->querybuf);
madolson marked this conversation as resolved.
Show resolved Hide resolved
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
void initSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} {

test {CLIENT LIST} {
r client list
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}

test {CLIENT LIST with IDs} {
set myid [r client id]
Expand All @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} {

test {CLIENT INFO} {
r client info
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}

test {CLIENT KILL with illegal arguments} {
assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill}
Expand Down
21 changes: 20 additions & 1 deletion tests/unit/querybuf.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,22 @@ start_server {tags {"querybuf slow"}} {
# The test will run at least 2s to check if client query
# buffer will be resized when client idle 2s.
test "query buffer resized correctly" {
set rd [redis_client]
set rd [redis_deferring_client]
$rd client setname test_client
$rd read

# Make sure query buff has size of 0 bytes at start as the client uses the shared qb.
assert {[client_query_buffer test_client] == 0}

# Send partial command to client to make sure it doesn't use the shared qb.
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na"
$rd flush
after 100
# send the rest of the command
$rd write "a\r\n\$1\r\nb\r\n"
$rd flush
madolson marked this conversation as resolved.
Show resolved Hide resolved
assert_equal {OK} [$rd read]

set orig_test_client_qbuf [client_query_buffer test_client]
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k
Expand Down Expand Up @@ -78,6 +92,11 @@ start_server {tags {"querybuf slow"}} {
$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n"
$rd flush

after 200
# Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size.
$rd write "a"
$rd flush

after 20
if {[client_query_buffer test_client] < 1000000} {
fail "query buffer should not be resized when client idle time smaller than 2s"
Expand Down