diff --git a/src/blocked.c b/src/blocked.c index 5f3d8bf504..8e1974a703 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -81,7 +81,7 @@ void initClientBlockingState(client *c) { c->bstate.numreplicas = 0; c->bstate.numlocal = 0; c->bstate.reploffset = 0; - c->bstate.client_waiting_acks_list_node = NULL; + c->bstate.generic_blocked_list_node = NULL; c->bstate.module_blocked_handle = NULL; c->bstate.async_rm_call_handle = NULL; } @@ -194,8 +194,9 @@ void unblockClient(client *c, int queue_for_reprocessing) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); } else if (c->bstate.btype == BLOCKED_POSTPONE) { - listDelNode(server.postponed_clients, c->postponed_list_node); - c->postponed_list_node = NULL; + serverAssert(c->bstate.postponed_list_node); + listDelNode(server.postponed_clients, c->bstate.postponed_list_node); + c->bstate.postponed_list_node = NULL; } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ } else { @@ -613,7 +614,8 @@ void blockPostponeClient(client *c) { c->bstate.timeout = 0; blockClient(c, BLOCKED_POSTPONE); listAddNodeTail(server.postponed_clients, c); - c->postponed_list_node = listLast(server.postponed_clients); + serverAssert(c->bstate.postponed_list_node == NULL); + c->bstate.postponed_list_node = listLast(server.postponed_clients); /* Mark this client to execute its command */ c->flag.pending_command = 1; } diff --git a/src/networking.c b/src/networking.c index 8b69743530..27d81da493 100644 --- a/src/networking.c +++ b/src/networking.c @@ -212,7 +212,6 @@ client *createClient(connection *conn) { c->peerid = NULL; c->sockname = NULL; c->client_list_node = NULL; - c->postponed_list_node = NULL; c->io_read_state = CLIENT_IDLE; c->io_write_state = CLIENT_IDLE; c->nwritten = 0; diff --git a/src/server.h b/src/server.h index 9dd7f8fefa..7fb7b2a4e5 100644 --- a/src/server.h +++ b/src/server.h @@ -1010,15 +1010,22 @@ typedef struct blockingState { * is > timeout then the operation timed out. */ int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys is deleted or does not exist anymore */ + union { + listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */ + listNode *postponed_list_node; /* list node in server.postponed_clients */ + listNode *generic_blocked_list_node; /* generic placeholder for blocked clients utility lists. + Since a client cannot be blocked multiple times, we can assume + it will be held in only one extra utility list, so it is ok to maintain + a union of these listNode references. */ + }; /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */ dict *keys; /* The keys we are blocked on */ /* BLOCKED_WAIT and BLOCKED_WAITAOF */ - int numreplicas; /* Number of replicas we are waiting for ACK. */ - int numlocal; /* Indication if WAITAOF is waiting for local fsync. */ - long long reploffset; /* Replication offset to reach. */ - listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */ + int numreplicas; /* Number of replicas we are waiting for ACK. */ + int numlocal; /* Indication if WAITAOF is waiting for local fsync. */ + long long reploffset; /* Replication offset to reach. */ /* BLOCKED_MODULE */ void *module_blocked_handle; /* ValkeyModuleBlockedClient structure. @@ -1321,7 +1328,6 @@ typedef struct client { sds peerid; /* Cached peer ID. */ sds sockname; /* Cached connection target address. */ listNode *client_list_node; /* list node in client list */ - listNode *postponed_list_node; /* list node within the postponed list */ void *module_blocked_client; /* Pointer to the ValkeyModuleBlockedClient associated with this * client. This is set in case of module authentication before the * unblocked client is reprocessed to handle reply callbacks. */