Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to pre-declare all the used message bus queues on service startup (allow users to use non RabbitMQ kombu transports) #3639

Merged
merged 10 commits into from
Aug 3, 2017

Conversation

Kami
Copy link
Member

@Kami Kami commented Aug 2, 2017

This pull request adds ability to predeclare all the used message bus queues (and as such, exchanges) on service startup and allows users to use other non-RabbitMQ kombu transports which are not officially supported.

Background

#3635 inspired me to dig in a little to try to find out what is going on. After spending whole morning digging in and tracking what is going on, I managed to track down the root cause on why Redis kombu transport doesn't work.

#3638 fixed a small issue which was just related to retrying and error propagation, but the fix for the actual root cause of things not working is included in this PR.

When trying to use Redis kombu transport instead of RabbitMQ one, you got an error like this on each service start up:

2017-08-02 10:35:44,107 ERROR [-] Rabbitmq connection error: 
Cannot route message for exchange 'st2.liveaction': Table empty or key no longer exists.
Probably the key (u'_kombu.binding.st2.liveaction') has been removed from the Redis database.

If you inspect Redis keys you actually find out that none of the exchanges were pre-declared and that's why the issue arises and publishing a message to message bus exchange fails.

127.0.0.1:6379> keys *
 1) "_kombu.binding.st2.liveaction.status"
 2) "st2.trigger.watch.TimersController-5ad3e912b6"
 3) "st2.trigger.watch.WebhooksController-4e7cd714fd"
 4) "st2.trigger.watch.TimersController-efc54cba86"
 6) "st2.trigger.watch.TimersController-f76e523afb"
 7) "st2.trigger.watch.WebhooksController-d1ae5a0ee9"
 8) "st2.trigger.watch.WebhooksController-24c40d85d4"

It turned out that only exchanges for queues which already exist were pre-declared (aka exchanges for which we already have consumers online). That's problematic and obviously won't work in distributed HA workers model.

In this model, producer can be online before the consumer (e.g. API is online before action runner and other services which actually bind a queue to a particular exchange). Another example of late binding is the stream service - we actually only bind to the exchange once first request hits /v1/stream endpoint aka stream service.

In fact this happens very often in such setups and it's one of the benefits and reasons of using a worker model - you want messages to still be queued even if the consumer is not online yet and delivered / processed when consumer comes online (normal scenario for such model).

In our example this could mean action runner and other services not being online yet / being restarted / similar, but API service is online and user queues action execution. We obviously want to allow user to perform this operation and queue execution to be run which will be picked up and processed when the action runner service is online.

kombu documentation specifically clarifies this needs to be done in such scenarios (http://docs.celeryproject.org/projects/kombu/en/latest/userguide/producers.html), but because of the implementation details this is not actually required with RabbitMQ transport (nasty!) and our code works just fine with RabbitMQ transport (I guess we were just lucky).

The declare argument lets you pass a list of entities that must be declared before sending the message. This is especially important when using the retry flag, since the broker may actually restart during a retry in which case non-durable entities are removed.

Say you are writing a task queue, and the workers may have not started yet so the queues aren’t declared. In this case you need to define both the exchange, and the declare the queue so that the message is delivered to the queue while the workers are offline

Proposed solution

The solution which fixes this problem is to pre-declare all the queues which causes exchanges to be pre-declared in Redis.

After this change, Redis keyspace looks like this after service set up is ran.

127.0.0.1:6379> keys *
1) "_kombu.binding.st2.liveaction"
2) "_kombu.binding.st2.liveaction.status"
3) "_kombu.binding.st2.execution"
4) "_kombu.binding.st2.actionexecutionstate"
5) "_kombu.binding.st2.trigger_instances_dispatch"
6) "_kombu.binding.st2.announcement"
7) "_kombu.binding.st2.sensor"
8) "_kombu.binding.st2.trigger"

After this change I tested basic functionality (run an action, etc.) and it seems to work fine, but who knows how many different edge case related issues could still be hiding underneath.

Having said that - RabbitMQ is still only officially supported and recommended backend by us and using any other backend is at user's own risk. You have been warned - if you use a non-officially supported backend we don't provide support for it and we can't guarantee everything will work. If you are fine with that, go ahead, but don't complain to us if things don't work.

Even though running this pre-declare code shouldn't in any negative way affect other existing stuff, I still decided to put it behind a feature flag (messaging.predeclare_queues config option).

This whole pre-init / pre-declare phase is also nothing special and follows exactly the same pre-initialization approach which is required in a distributed system and we do for other things.

Resolves #3635.

This is required when using worker model where all the services which
consume from queues might not already be online before the producers.

Because of the implementation details, this is not required with
RabbitMQ transport, but it's requires with some other transports such
as Redis one.

This feature can be enabled by setting messaging.predeclare_queues
st2.conf config option to True (False by default).
@Kami Kami added this to the 2.4.0 milestone Aug 2, 2017
# Because of the worker model used, this is required with some non-standard transports such as
# Redis one. Even though kombu requires queues to be pre-declared upfront in such scenarios,
# RabbitMQ transport doesn't require that.
QUEUES = [
Copy link
Member

@arm4b arm4b Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.
Curious if that approach can fix #3290

Will test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious as well :). Thanks for watching out @armab

Copy link
Member

@arm4b arm4b Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked.

Sadly, #3290 MQ-related bug is still clearly reproducible with the new predeclare_queues=True Xenial package

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Le sigh.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the fun of it - you could also try setting this config option value to True and using Redis transport to see if this makes a difference (if the issue is RabbitMQ related).

@@ -155,7 +155,11 @@ def register_opts(ignore_errors=False):
cfg.IntOpt('connection_retries', default=10,
help='How many times should we retry connection before failing.'),
cfg.IntOpt('connection_retry_wait', default=10000,
help='How long should we wait between connection retries.')
help='How long should we wait between connection retries.'),
cfg.BoolOpt('predeclare_queues', default=False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What queues? :P. I'd prefer us to be explicit and say AMQP queues or similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of the messaging config option through so I think it's fine as it is (we also don't include more verbose info in description for other messaging group options :)

help='How long should we wait between connection retries.')
help='How long should we wait between connection retries.'),
cfg.BoolOpt('predeclare_queues', default=False,
help=('True to pre-declare all the queues on service setup. This is required '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Set this to True to pre-declare all AMQP queues on service startup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

liveaction.get_status_management_queue(name='st2.preinit', routing_key='init'),
reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'),
reactor.get_trigger_instances_queue(name='st2.preinit', routing_key='init'),
reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these fake queues that will never be consumed? Because the acutal work queues are named differently. What does rabbitmqadmin tell you about the size of these queues? Do they grow? See #3622. I am worried that the queue sizes would grow without consumers. Can you double check, please?

Copy link
Member Author

@Kami Kami Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they will never be consumed - in fact only one queue which will never be used and grow will be created. Nothing publishes to them so they never grow.

We only do that, because that's the only way to make sure exchanges are created.

(virtualenv)vagrant@vagrant-ubuntu-trusty-64:/data/stanley$ sudo rabbitmqctl list_queues
Listing queues ...
...
st2.preinit	0
....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least the names are fixed. That's good. Thanks for digging in.

@johnarnold
Copy link

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug: Retrying "register exchanges" causes TypeError exception, using Redis as message queue
4 participants