Skip to content

Commit

Permalink
Move prepareClientToWrite out of loop for lrange command to reduce th…
Browse files Browse the repository at this point in the history
…e redundant call. (#860)

## Description 
When I explore the cycles distributions for `lrange` test (
`valkey-benchmark -p 9001 -t lrange -d 100 -r 1000000 -n 1000000 -c 50
--threads 4`). I found the `prepareClientToWrite` and
`clientHasPendingReplies` could be reduced to single call outside
instead of called in a loop, ideally we can gain 3% performance. The
corresponding `LRANG_100`, `LRANG_300`, `LRANGE_500`, `LRANGE_600` have
~2% - 3% performance boost, the benchmark test prove it helps.

This patch try to move the `prepareClientToWrite` and its child
`clientHasPendingReplies` out of the loop to reduce the function
overhead.

---------

Signed-off-by: Lipeng Zhu <lipeng.zhu@intel.com>
  • Loading branch information
lipzhu authored and madolson committed Sep 3, 2024
1 parent daf09fb commit 5d06dc8
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
29 changes: 29 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,16 @@ void setDeferredPushLen(client *c, void *node, long length) {
setDeferredAggregateLen(c, node, length, '>');
}

/* Prepare a client for future writes. This is used so that we can
* skip a large number of calls to prepareClientToWrite when
* a command produces a lot of discrete elements in its output. */
writePreparedClient *prepareClientForFutureWrites(client *c) {
if (prepareClientToWrite(c) == C_OK) {
return (writePreparedClient *)c;
}
return NULL;
}

/* Add a double as a bulk reply */
void addReplyDouble(client *c, double d) {
if (c->resp == 3) {
Expand Down Expand Up @@ -1026,6 +1036,11 @@ void addReplyArrayLen(client *c, long length) {
addReplyAggregateLen(c, length, '*');
}

void addWritePreparedReplyArrayLen(writePreparedClient *c, long length) {
serverAssert(length >= 0);
_addReplyLongLongWithPrefix(c, length, '*');
}

void addReplyMapLen(client *c, long length) {
int prefix = c->resp == 2 ? '*' : '%';
if (c->resp == 2) length *= 2;
Expand Down Expand Up @@ -1098,6 +1113,12 @@ void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
_addReplyToBufferOrList(c, "\r\n", 2);
}

void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len) {
_addReplyLongLongWithPrefix(c, len, '$');
_addReplyToBufferOrList(c, p, len);
_addReplyToBufferOrList(c, "\r\n", 2);
}

/* Add sds to reply (takes ownership of sds and frees it) */
void addReplyBulkSds(client *c, sds s) {
if (prepareClientToWrite(c) != C_OK) {
Expand Down Expand Up @@ -1136,6 +1157,14 @@ void addReplyBulkLongLong(client *c, long long ll) {
addReplyBulkCBuffer(c, buf, len);
}

void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll) {
char buf[64];
int len;

len = ll2string(buf, 64, ll);
addWritePreparedReplyBulkCBuffer(c, buf, len);
}

/* Reply with a verbatim type having the specified extension.
*
* The 'ext' is the "extension" of the file, actually just a three
Expand Down
10 changes: 10 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,12 @@ typedef struct client {
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
} client;

/* When a command generates a lot of discrete elements to the client output buffer, it is much faster to
* skip certain types of initialization. This type is used to indicate a client that has been initialized
* and can be used with addWritePreparedReply* functions. A client can be cast into this type with
* prepareClientForFutureWrites(client *c). */
typedef client writePreparedClient;

/* ACL information */
typedef struct aclInfo {
long long user_auth_failures; /* Auth failure counts on user level */
Expand Down Expand Up @@ -2779,6 +2785,7 @@ int processInputBuffer(client *c);
void acceptCommonHandler(connection *conn, struct ClientFlags flags, char *ip);
void readQueryFromClient(connection *conn);
int prepareClientToWrite(client *c);
writePreparedClient *prepareClientForFutureWrites(client *c);
void addReplyNull(client *c);
void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
Expand All @@ -2788,7 +2795,9 @@ void AddReplyFromClient(client *c, client *src);
void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addWritePreparedReplyBulkCBuffer(writePreparedClient *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll);
void addWritePreparedReplyBulkLongLong(writePreparedClient *c, long long ll);
void addReply(client *c, robj *obj);
void addReplyStatusLength(client *c, const char *s, size_t len);
void addReplySds(client *c, sds s);
Expand All @@ -2810,6 +2819,7 @@ void addReplyBigNum(client *c, const char *num, size_t len);
void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
void addReplyArrayLen(client *c, long length);
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length);
void addReplyMapLen(client *c, long length);
void addReplySetLen(client *c, long length);
void addReplyAttributeLen(client *c, long length);
Expand Down
19 changes: 11 additions & 8 deletions src/t_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -650,18 +650,20 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long
* Note that the purpose is to make the methods small so that the
* code in the loop can be inlined better to improve performance. */
void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int reverse) {
writePreparedClient *wpc = prepareClientForFutureWrites(c);
if (!wpc) return;
/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c, rangelen);
addWritePreparedReplyArrayLen(wpc, rangelen);

int direction = reverse ? AL_START_TAIL : AL_START_HEAD;
quicklistIter *iter = quicklistGetIteratorAtIdx(o->ptr, direction, from);
while (rangelen--) {
quicklistEntry qe;
serverAssert(quicklistNext(iter, &qe)); /* fail on corrupt data */
if (qe.value) {
addReplyBulkCBuffer(c, qe.value, qe.sz);
addWritePreparedReplyBulkCBuffer(wpc, qe.value, qe.sz);
} else {
addReplyBulkLongLong(c, qe.longval);
addWritePreparedReplyBulkLongLong(wpc, qe.longval);
}
}
quicklistReleaseIterator(iter);
Expand All @@ -671,21 +673,22 @@ void addListQuicklistRangeReply(client *c, robj *o, int from, int rangelen, int
* Note that the purpose is to make the methods small so that the
* code in the loop can be inlined better to improve performance. */
void addListListpackRangeReply(client *c, robj *o, int from, int rangelen, int reverse) {
writePreparedClient *wpc = prepareClientForFutureWrites(c);
if (!wpc) return;
/* Return the result in form of a multi-bulk reply */
addWritePreparedReplyArrayLen(wpc, rangelen);
unsigned char *p = lpSeek(o->ptr, from);
unsigned char *vstr;
unsigned int vlen;
long long lval;

/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c, rangelen);

while (rangelen--) {
serverAssert(p); /* fail on corrupt data */
vstr = lpGetValue(p, &vlen, &lval);
if (vstr) {
addReplyBulkCBuffer(c, vstr, vlen);
addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen);
} else {
addReplyBulkLongLong(c, lval);
addWritePreparedReplyBulkLongLong(wpc, lval);
}
p = reverse ? lpPrev(o->ptr, p) : lpNext(o->ptr, p);
}
Expand Down

0 comments on commit 5d06dc8

Please sign in to comment.