Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async.c:451: redisProcessCallbacks: Assertion `(c->flags & 0x20 || c->flags & 0x40)' failed. #490

Closed
Nightaway opened this issue Nov 26, 2016 · 10 comments

Comments

@Nightaway
Copy link

Nightaway commented Nov 26, 2016

redis-adapter.h

#ifndef __DRAGON_REDIS_ADAPTER_H__
#define __DRAGON_REDIS_ADAPTER_H__

#include <stdlib.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>

typedef struct redisEvents {
    redisAsyncContext *context;
    int epfd;
} redisEvents;

static void redisReadEvent(redisEvents *e) {
    redisAsyncHandleRead(e->context);
}

static void redisWriteEvent(redisEvents *e) {
    redisAsyncHandleWrite(e->context);
}

static void redisAddRead(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	epoll_event event;
	event.data.fd = e->context->c.fd;
	event.events = EPOLLIN;
	int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
	if (ret == -1) {
		if (errno == EEXIST) {
			ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
		}
		if (ret  == -1) {
			perror("redisAddRead epoll_ctl");
		}
		return ;
	}
}

static void redisDelRead(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
	if (ret == -1) {
		perror("redisDelRead epoll_ctl");
		return ;
	}

}

static void redisAddWrite(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	epoll_event event;
	event.data.fd = e->context->c.fd;
	event.events = EPOLLOUT;
	int ret = epoll_ctl(e->epfd, EPOLL_CTL_ADD, e->context->c.fd, &event);
	if (ret == -1) {
		if (errno == EEXIST) {
			ret = epoll_ctl(e->epfd, EPOLL_CTL_MOD, e->context->c.fd, &event);
		}
		if (ret  == -1) {
			perror("redisAddWrite epoll_ctl");
		}
		return ;
	}
}

static void redisDelWrite(void *privdata) {
	redisEvents *e = (redisEvents*)privdata;

	int ret = epoll_ctl(e->epfd, EPOLL_CTL_DEL, e->context->c.fd, NULL);
	if (ret == -1) {
		perror("redisDelWrite epoll_ctl");
		return ;
	}
}

static void redisCleanup(void *privdata) {
    redisEvents *e = (redisEvents*)privdata;
    redisDelRead(privdata);
    redisDelWrite(privdata);
    free(e);
}

static int redisAttach(redisAsyncContext *ac, int epfd) {
    redisContext *c = &(ac->c);
    redisEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisEvents*)malloc(sizeof(*e));
    e->context = ac;
    e->epfd = epfd;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisAddRead;
    ac->ev.delRead = redisDelRead;
    ac->ev.addWrite = redisAddWrite;
    ac->ev.delWrite = redisDelWrite;
    ac->ev.cleanup = redisCleanup;
    ac->ev.data = e;

    /* Initialize read/write events */
	epoll_event event;
	event.data.fd = c->fd;
	event.events = EPOLLIN|EPOLLOUT;
	int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, c->fd, &event);
	if (ret == -1) {
		perror("redisAttach epoll_ctl");
		return REDIS_ERR;
	}
    return REDIS_OK;
}

#endif

server.cc

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>

#include "redis-adapter.h"

int sockfd;
int epfd;
epoll_event event;
redisAsyncContext *c_sub;

static void subCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = (redisReply *)r;
    if (reply == NULL) return;
}

int CreateSocketAndBind(int port) {
	struct sockaddr_in servaddr;
	sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd == -1) {
		perror("socket:");
		return -1;
	}
	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(port);
	int ret = bind(sockfd, (sockaddr *)&servaddr, sizeof(servaddr));
	if (ret == -1) {
		perror("bind:");
		return -1;
	}
	return 0;
}
int MakeSocketNonbloking(int fd) {
	int flags = fcntl(fd, F_GETFL, 0);
	if (flags == -1) {
		perror("fcntl");
		return -1;
	}
 	
 	flags|= O_NONBLOCK;
  	int s = fcntl(fd, F_SETFL, flags);
	if (s == -1) {
		perror("fcntl");
		return-1;
	}
	return 0;
}

int InitEpoll() {
	int ret = listen(sockfd, SOMAXCONN);
	if (ret != 0) {
		perror("listen:");
		exit(1);
	}
	ret = MakeSocketNonbloking(sockfd);
	if (ret != 0) {
		perror("MakeSocketNonbloking:");
		exit(1);
	}
	epfd = epoll_create(65535);
	if (epfd == -1) {
		perror("epoll_create:");
		exit(1);
	}

	event.data.fd = sockfd;
	event.events = EPOLLIN;
	ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
	if (ret == -1) {
		perror("epoll_ctl");
		exit(1);
	}

	c_sub = redisAsyncConnect("127.0.0.1", 6379);
	if (c_sub->err) {
	    printf("Error: %s\n", c_sub->errstr);
	    exit(1);
	}
	redisAttach(c_sub, epfd);
}

int Run() {
	epoll_event events[64];
	for (int z=0; z<10; ++z) {
		redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %d", z);
		redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %d", z);
	}
	while (1) {
		int count = epoll_wait(epfd, events, 64, 0);
		if (count == -1) {
			perror("epoll_wait:");
			exit(1);
		} 

		for (int i=0; i<count; i++) {
			 if (events[i].data.fd == c_sub->c.fd) {
				printf("Redis sub event %d\n", getpid());
				if (events[i].events & EPOLLIN) {
					redisAsyncHandleRead(c_sub);
				} else if (events[i].events & EPOLLOUT) {
					redisAsyncHandleWrite(c_sub);
				}
			}
		}
	}
	close(epfd);
}

int main() {
	CreateSocketAndBind(9090);
	InitEpoll();
	Run();
	return 0;
}

I am coding a IM Server using redis sub/pub pattern, server.cc crash on assert occasionally while client forking 100 client processes and use ctrl + c to close client processes. if u comment server.cc redisAsyncCommand(c_sub, subCallback, NULL, "SUBSCRIBE %s", name); and redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str()); problem will go away.

@badboy
Copy link
Contributor

badboy commented Nov 26, 2016

This is a huge example of code I absolutely don't know. Plus on first sight it's completely unclear what the client.cc is for, it doesn't touch Redis/Hiredis at all.

Please provide a reduced example triggering the error.

@Nightaway
Copy link
Author

Nightaway commented Nov 28, 2016

code reduced.
I use redis as a message broker.

@Nightaway
Copy link
Author

I find problem that is UNSUBSCRIBE command call. The problem gone away when comment redisAsyncCommand(c_sub, subCallback, NULL, "UNSUBSCRIBE %s", c->id.c_str());.

@badboy
Copy link
Contributor

badboy commented Nov 28, 2016

There are still hundreds of lines of code. What exactly did you reduce?

@Nightaway
Copy link
Author

Nightaway commented Nov 28, 2016

Reduced again

@kbirk
Copy link

kbirk commented Oct 31, 2018

I'm experiencing the same failed assertion. My code is nearly identical to the pub / sub sample in the wiki, with only an additional call to unsubscribe.

Once I call:

auto status = redisAsyncCommand(context_, nullptr, nullptr, "UNSUBSCRIBE %s", key.c_str());

After the onMessage function executes for the unsubscribe response, I get the failed assertion:

async.c:463: redisProcessCallbacks: Assertion (c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

Is there anything I need to do before unsubscribing to prevent this?

@kbirk
Copy link

kbirk commented Nov 1, 2018

After some digging I found a potential cause of the issue. I found the issue occurs when my program does the following:

  • Subscribe to a channel
  • Poll libevent for messages, process the subscribe and message replies
  • Unsubscribe from the channel
  • Subscribe to a different channel
  • Poll libevent, assert fails after processing the unsubscribe reply

It seems the only way I can prevent the failed assertion is by not subscribing to a new channel until I've processed a unsubscribe reply from the previous.

Is this intended? I was under the impression that you could have multiple subscriptions on a single context.

@ZezhongWang
Copy link

ZezhongWang commented Apr 27, 2020

I also faced the same problem. After I have unsubscribed all the channel and then call redisAsyncHandleRead, hiredis assert failed at async.c: 478.
I use hiredis 0.14.1 version.
And I didn't use multithread.

@bjosv
Copy link
Contributor

bjosv commented Sep 5, 2022

This issue seems to describe what is fixed on master by #1036.

@michael-grunder
Copy link
Collaborator

I think you're right. Going to close this but if the problem isn't actually solved feel free to reopen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants