Skip to content

Commit

Permalink
Optimize linear search of WAIT and WAITAOF when unblocking the client (
Browse files Browse the repository at this point in the history
…#787)

Currently, if the client enters a blocked state, it will be
added to the server.clients_waiting_acks list. When the client
is unblocked, that is, when unblockClient is called, we will
need to linearly traverse server.clients_waiting_acks to delete
the client, and this search is O(N).

When WAIT (or WAITAOF) is used extensively in some cases, this
O(N) search may be time-consuming. We can remember the list node
and store it in the blockingState struct and it can avoid the
linear search in unblockClientWaitingReplicas.

Signed-off-by: Binbin <binloveplay1314@qq.com>
  • Loading branch information
enjoy-binbin authored and madolson committed Sep 3, 2024
1 parent a2f4aed commit 4d93ef0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
10 changes: 9 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.client_waiting_acks_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
}

Expand Down Expand Up @@ -595,6 +598,11 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unblockClientWaitingReplicas() will not
* require a linear scan, but just a constant time operation. */
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
blockClient(c, BLOCKED_WAIT);
}

Expand Down
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4358,9 +4358,9 @@ void waitaofCommand(client *c) {
* waiting for replica acks. Never call it directly, call unblockClient()
* instead. */
void unblockClientWaitingReplicas(client *c) {
listNode *ln = listSearchKey(server.clients_waiting_acks, c);
serverAssert(ln != NULL);
listDelNode(server.clients_waiting_acks, ln);
serverAssert(c->bstate.client_waiting_acks_list_node);
listDelNode(server.clients_waiting_acks, c->bstate.client_waiting_acks_list_node);
c->bstate.client_waiting_acks_list_node = NULL;
updateStatsOnUnblock(c, 0, 0, 0);
}

Expand Down
10 changes: 6 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1002,21 +1002,23 @@ typedef struct multiState {
} multiState;

/* This structure holds the blocking operation state for a client.
* The fields used depend on client->btype. */
* The fields used depend on client->bstate.btype. */
typedef struct blockingState {
/* Generic fields. */
blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
is deleted or does not exist anymore */

/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
dict *keys; /* The keys we are blocked on */

/* BLOCKED_WAIT and BLOCKED_WAITAOF */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */

/* BLOCKED_MODULE */
void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.
Expand Down

0 comments on commit 4d93ef0

Please sign in to comment.