Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize linear search of WAIT and WAITAOF when unblocking the client #787

Merged
merged 4 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.client_waiting_acks_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
}

Expand Down Expand Up @@ -593,6 +596,11 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unblockClientWaitingReplicas() will not
* require a linear scan, but just a constant time operation. */
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
blockClient(c, BLOCKED_WAIT);
}

Expand Down
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3513,9 +3513,9 @@ void waitaofCommand(client *c) {
* waiting for replica acks. Never call it directly, call unblockClient()
* instead. */
void unblockClientWaitingReplicas(client *c) {
listNode *ln = listSearchKey(server.clients_waiting_acks, c);
serverAssert(ln != NULL);
listDelNode(server.clients_waiting_acks, ln);
serverAssert(c->bstate.client_waiting_acks_list_node);
listDelNode(server.clients_waiting_acks, c->bstate.client_waiting_acks_list_node);
c->bstate.client_waiting_acks_list_node = NULL;
updateStatsOnUnblock(c, 0, 0, 0);
}

Expand Down
10 changes: 6 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -975,21 +975,23 @@ typedef struct multiState {
} multiState;

/* This structure holds the blocking operation state for a client.
* The fields used depend on client->btype. */
* The fields used depend on client->bstate.btype. */
typedef struct blockingState {
/* Generic fields. */
blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
is deleted or does not exist anymore */

/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
dict *keys; /* The keys we are blocked on */

/* BLOCKED_WAIT and BLOCKED_WAITAOF */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */

/* BLOCKED_MODULE */
void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.
Expand Down
Loading