Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of a single listNode pointer for blocking utility lists #919

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void initClientBlockingState(client *c) {
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.client_waiting_acks_list_node = NULL;
c->bstate.generic_blocked_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
}
Expand Down Expand Up @@ -194,8 +194,9 @@ void unblockClient(client *c, int queue_for_reprocessing) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->bstate.btype == BLOCKED_POSTPONE) {
listDelNode(server.postponed_clients, c->postponed_list_node);
c->postponed_list_node = NULL;
serverAssert(c->bstate.postponed_list_node);
listDelNode(server.postponed_clients, c->bstate.postponed_list_node);
c->bstate.postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else {
Expand Down Expand Up @@ -613,7 +614,8 @@ void blockPostponeClient(client *c) {
c->bstate.timeout = 0;
blockClient(c, BLOCKED_POSTPONE);
listAddNodeTail(server.postponed_clients, c);
c->postponed_list_node = listLast(server.postponed_clients);
serverAssert(c->bstate.postponed_list_node == NULL);
c->bstate.postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flag.pending_command = 1;
}
Expand Down
1 change: 0 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
c->postponed_list_node = NULL;
c->io_read_state = CLIENT_IDLE;
c->io_write_state = CLIENT_IDLE;
c->nwritten = 0;
Expand Down
16 changes: 11 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1010,15 +1010,22 @@ typedef struct blockingState {
* is > timeout then the operation timed out. */
int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
is deleted or does not exist anymore */
union {
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */
listNode *postponed_list_node; /* list node within the postponed list */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
listNode *generic_blocked_list_node; /* generic placeholder for blocked clients utility lists.
Since a client cannot be blocked multiple times, we can assume
it will be held in only one extra utility list, so it is O.K maintain a
ranshid marked this conversation as resolved.
Show resolved Hide resolved
ranshid marked this conversation as resolved.
Show resolved Hide resolved
union of these listNode references. */
};

/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
dict *keys; /* The keys we are blocked on */

/* BLOCKED_WAIT and BLOCKED_WAITAOF */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
long long reploffset; /* Replication offset to reach. */

/* BLOCKED_MODULE */
void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.
Expand Down Expand Up @@ -1321,7 +1328,6 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *postponed_list_node; /* list node within the postponed list */
void *module_blocked_client; /* Pointer to the ValkeyModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
Expand Down
Loading