From ec8e5b33fe175410ee0d8145cb26b9ce8e2f7050 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sun, 18 Aug 2024 21:20:35 +0800 Subject: [PATCH] Optimize linear search of WAIT and WAITAOF when unblocking the client (#787) Currently, if the client enters a blocked state, it will be added to the server.clients_waiting_acks list. When the client is unblocked, that is, when unblockClient is called, we will need to linearly traverse server.clients_waiting_acks to delete the client, and this search is O(N). When WAIT (or WAITAOF) is used extensively in some cases, this O(N) search may be time-consuming. We can remember the list node and store it in the blockingState struct and it can avoid the linear search in unblockClientWaitingReplicas. Signed-off-by: Binbin Signed-off-by: mwish --- src/blocked.c | 10 +++++++++- src/replication.c | 6 +++--- src/server.h | 10 ++++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 5f4c5d4494..5f3d8bf504 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -76,10 +76,13 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); void initClientBlockingState(client *c) { c->bstate.btype = BLOCKED_NONE; c->bstate.timeout = 0; + c->bstate.unblock_on_nokey = 0; c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); c->bstate.numreplicas = 0; + c->bstate.numlocal = 0; c->bstate.reploffset = 0; - c->bstate.unblock_on_nokey = 0; + c->bstate.client_waiting_acks_list_node = NULL; + c->bstate.module_blocked_handle = NULL; c->bstate.async_rm_call_handle = NULL; } @@ -595,6 +598,11 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon c->bstate.numreplicas = numreplicas; c->bstate.numlocal = numlocal; listAddNodeHead(server.clients_waiting_acks, c); + /* Note that we remember the linked list node where the client is stored, + * this way removing the client in unblockClientWaitingReplicas() will not + * require a linear scan, but just a constant time operation. */ + serverAssert(c->bstate.client_waiting_acks_list_node == NULL); + c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks); blockClient(c, BLOCKED_WAIT); } diff --git a/src/replication.c b/src/replication.c index 35cd82b0ea..a6ddfad36a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4358,9 +4358,9 @@ void waitaofCommand(client *c) { * waiting for replica acks. Never call it directly, call unblockClient() * instead. */ void unblockClientWaitingReplicas(client *c) { - listNode *ln = listSearchKey(server.clients_waiting_acks, c); - serverAssert(ln != NULL); - listDelNode(server.clients_waiting_acks, ln); + serverAssert(c->bstate.client_waiting_acks_list_node); + listDelNode(server.clients_waiting_acks, c->bstate.client_waiting_acks_list_node); + c->bstate.client_waiting_acks_list_node = NULL; updateStatsOnUnblock(c, 0, 0, 0); } diff --git a/src/server.h b/src/server.h index 49d0a4485a..9dd7f8fefa 100644 --- a/src/server.h +++ b/src/server.h @@ -1002,7 +1002,7 @@ typedef struct multiState { } multiState; /* This structure holds the blocking operation state for a client. - * The fields used depend on client->btype. */ + * The fields used depend on client->bstate.btype. */ typedef struct blockingState { /* Generic fields. */ blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */ @@ -1010,13 +1010,15 @@ typedef struct blockingState { * is > timeout then the operation timed out. */ int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys is deleted or does not exist anymore */ + /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */ dict *keys; /* The keys we are blocked on */ /* BLOCKED_WAIT and BLOCKED_WAITAOF */ - int numreplicas; /* Number of replicas we are waiting for ACK. */ - int numlocal; /* Indication if WAITAOF is waiting for local fsync. */ - long long reploffset; /* Replication offset to reach. */ + int numreplicas; /* Number of replicas we are waiting for ACK. */ + int numlocal; /* Indication if WAITAOF is waiting for local fsync. */ + long long reploffset; /* Replication offset to reach. */ + listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */ /* BLOCKED_MODULE */ void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.