Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' fails when calling "SUBSCRIBE aaa" and then "UNSUBSCRIBE aaa bbb" #1039

Closed
SKYSCRAPERS1999 opened this issue Jan 13, 2022 · 4 comments · Fixed by #1047

Comments

@SKYSCRAPERS1999
Copy link

SKYSCRAPERS1999 commented Jan 13, 2022

Hiredis crashes after calling redisAsyncCommand(c, NULL, NULL, "subscribe aaa") and then redisAsyncCommand(c, NULL, NULL, "unsubscribe aaa bbb").

The code is simply modified from the example "examples/example-ae.c". I compiled the code like this: AE_DIR=~/redis/src make hiredis-example-ae -j16:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

#include <hiredis.h>
#include <async.h>
#include <adapters/ae.h>

/* Put event loop in the global scope, so it can be explicitly stopped */
static aeEventLoop *loop;

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        aeStop(loop);
        return;
    }

    printf("Connected...\n");
}

int main (int argc, char **argv) {
    signal(SIGPIPE, SIG_IGN);

    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }

    loop = aeCreateEventLoop(64);
    redisAeAttach(loop, c);
    redisAsyncSetConnectCallback(c,connectCallback);
    redisAsyncCommand(c, NULL, NULL, "subscribe aaa");
    redisAsyncCommand(c, NULL, NULL, "unsubscribe aaa bbb");
    aeMain(loop);
    return 0;
}

After starting a redis-server on localhost:6379 and run the code, it gave the error below: hiredis-example-ae: async.c:567: redisProcessCallbacks: Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.

image

I looked into the codebase of Hiredis and found some possible problem in async.c: in redisProcessCallbacks, the function uses a loop while((status = redisGetReply(c,&reply)) == REDIS_OK) to get the reply of commands. For example, after processing SUBSCRIBE aaa, the redisProcessCallbacks function is now getting the reply of UNSUBSCRIBE aaa bbb. When the reply of UNSUBSCRIBE aaa arrives, all channels has already been deleted in __redisGetSubscribeCallback and then c->flags &= ~REDIS_SUBSCRIBED. Then when redisProcessCallbacks process UNSUBSCRIBE bbb's reply, the assertion assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)) fails.

void redisProcessCallbacks(redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    redisCallback cb = {NULL, NULL, 0, NULL};
    void *reply = NULL;
    int status;

    while((status = redisGetReply(c,&reply)) == REDIS_OK) {
        if (reply == NULL) {
            /* When the connection is being disconnected and there are
             * no more replies, this is the cue to really disconnect. */
            if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
                && ac->replies.head == NULL) {
                __redisAsyncDisconnect(ac);
                return;
            }

            /* If monitor mode, repush callback */
            if(c->flags & REDIS_MONITORING) {
                __redisPushCallback(&ac->replies,&cb);
            }

            /* When the connection is not being disconnected, simply stop
             * trying to get replies and wait for the next loop tick. */
            break;
        }

        /* Keep track of push message support for subscribe handling */
        if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;

        /* Send any non-subscribe related PUSH messages to our PUSH handler
         * while allowing subscribe related PUSH messages to pass through.
         * This allows existing code to be backward compatible and work in
         * either RESP2 or RESP3 mode. */
        if (redisIsSpontaneousPushReply(reply)) {
            __redisRunPushCallback(ac, reply);
            c->reader->fn->freeObject(reply);
            continue;
        }

        /* Even if the context is subscribed, pending regular
         * callbacks will get a reply before pub/sub messages arrive. */
        if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
            /*
             * A spontaneous reply in a not-subscribed context can be the error
             * reply that is sent when a new connection exceeds the maximum
             * number of allowed connections on the server side.
             *
             * This is seen as an error instead of a regular reply because the
             * server closes the connection after sending it.
             *
             * To prevent the error from being overwritten by an EOF error the
             * connection is closed here. See issue #43.
             *
             * Another possibility is that the server is loading its dataset.
             * In this case we also want to close the connection, and have the
             * user wait until the server is ready to take our request.
             */
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                c->err = REDIS_ERR_OTHER;
                snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                c->reader->fn->freeObject(reply);
                __redisAsyncDisconnect(ac);
                return;
            }
            /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
            if(c->flags & REDIS_SUBSCRIBED)
                __redisGetSubscribeCallback(ac,reply,&cb);
        }

        if (cb.fn != NULL) {
            __redisRunCallback(ac,&cb,reply);
            if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
                c->reader->fn->freeObject(reply);
            }

            /* Proceed with free'ing when redisAsyncFree() was called. */
            if (c->flags & REDIS_FREEING) {
                __redisAsyncFree(ac);
                return;
            }
        } else {
            /* No callback for this reply. This can either be a NULL callback,
             * or there were no callbacks to begin with. Either way, don't
             * abort with an error, but simply ignore it because the client
             * doesn't know what the server will spit out over the wire. */
            c->reader->fn->freeObject(reply);
        }
    }

    /* Disconnect when there was an error reading the reply */
    if (status != REDIS_OK)
        __redisAsyncDisconnect(ac);
}
@SKYSCRAPERS1999 SKYSCRAPERS1999 changed the title Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed when calling "SUBSCRIBE aaa" and then "UNSUBSCRIBE aaa bbb" Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' fails when calling "SUBSCRIBE aaa" and then "UNSUBSCRIBE aaa bbb" Jan 13, 2022
@bjosv
Copy link
Contributor

bjosv commented Jan 13, 2022

I believe the PR #1036 would help in this case too.
Being to fast, the mentioned PR its actually not handling this case..

@bjosv
Copy link
Contributor

bjosv commented Jan 13, 2022

In this case Redis will reply for each channel given in the unsubscribe command, even if a channel has not been subscribed to.
There is probably a need to collect how many channels, i.e responses that is expected when sending the unsubscribe command to avoid getting out of sync..

@bjosv
Copy link
Contributor

bjosv commented Jan 13, 2022

Today there is a check in __redisAsyncCommand() that an unsubscribe command can't be sent unless a subscribe command already has been sent, so one option could be that it also checks the channels (if given) to make sure they all exists internally..
The redisAsyncCommand(unsubscribe) would in this testcase return an REDIS_ERR.
But than might seem too obstructive and maybe not what you want.. filtering out unknown channels might also be an option..

Any thoughts @michael-grunder ?

@SKYSCRAPERS1999
Copy link
Author

Today there is a check in __redisAsyncCommand() that an unsubscribe command can't be sent unless a subscribe command already has been sent, so one option could be that it also checks the channels (if given) to make sure they all exists internally.. The redisAsyncCommand(unsubscribe) would in this testcase return an REDIS_ERR. But than might seem too obstructive and maybe not what you want.. filtering out unknown channels might also be an option..

Any thoughts @michael-grunder ?

Yeah I think it's better to filtering out unknown channels, because sending the test case of SUBSCRIBE aaa and UNSUBSCRIBE aaa bbb to redis-server will not return an error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants