Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement keepalive loop as a coro to fix issue #282 #315

Merged
merged 2 commits into from
Jun 30, 2023
Merged

Implement keepalive loop as a coro to fix issue #282 #315

merged 2 commits into from
Jun 30, 2023

Conversation

jmoutte
Copy link
Contributor

@jmoutte jmoutte commented Jun 12, 2023

Due to the way the code consumes messages, we are not capturing the _on_disconnect Future from asyncio-mqtt. To avoid rewriting a lot of code I implemented a keep alive loop updating the status with a call to publish. This will capture the disconnected future and throw the exception to trigger reconnection.

This fixes issue #282

@BenjiU BenjiU self-assigned this Jun 22, 2023
@BenjiU
Copy link
Collaborator

BenjiU commented Jun 23, 2023

Hi jmoutte,

thanks for your PR, but currently I'm not fine with this fix: are you just sending the "keepalive" message manually? Shouldn't asyncio_mqtt do this internally?

The reconnecting section from asyncio_mqtt says this (what you did with your try/catch, right):

Reconnecting

You can reconnect when the connection to the broker is lost by wrapping your code in a try/except-block and listening for MqttErrors.

import asyncio
import asyncio_mqtt as aiomqtt


async def main():
    reconnect_interval = 5  # In seconds
    while True:
        try:
            async with aiomqtt.Client("test.mosquitto.org") as client:
                async with client.messages() as messages:
                    await client.subscribe("humidity/#")
                    async for message in messages:
                        print(message.payload.decode())
        except aiomqtt.MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
            await asyncio.sleep(reconnect_interval)



asyncio.run(main())

But I guess we need to find the correct section for the try/catch, right? Or am I missing something?

@jmoutte
Copy link
Contributor Author

jmoutte commented Jun 23, 2023

Hello BenjiU,

I understand your concerns and recognise that this is not ideal. Keepalive should indeed be implemented using the underlying mqtt client library and this is available in recent versions of asyncio_mqtt. This PR is implementing keep alive with a hack but it's main intent is to detect disconnections and to trigger the reconnection code which would not be handled by keep alive anyway.

I am not an asyncio expert, but after reading the code carefully, I am afraid that the way mqtt_io is using asyncio_mqtt makes it impossible for us to catch the exception of a disconnection when we are only waiting for messages.

Indeed, the example code is using a generator provided by asyncio_mqtt for .messages() that will wait for an AsyncIOQueue OR a disconnected future. This is the trick that is supposed to notify you of a disconnection while waiting for messages. The mqtt_io implementation is instead overriding the on_messages() method short-circuiting the asyncio_mqtt internal queues and ignoring the generator. This could work fine if the on_disconnected callback from paho_mqtt was also overridden but we don't have access to it.

So I haven't found an elegant solution keeping the override of _on_messages callback of pahomqtt and being able to catch disconnection events with the asynchronous callbacks implementation you created.

This PR has now been running for days with multiple disconnections at my place and works great.

BR,

Julien

@BenjiU
Copy link
Collaborator

BenjiU commented Jun 23, 2023

So you are an asyncio expert :-D
Okay, when you could fix the lint warning, I would merge this as a work around. Maybe you can add some comment in the code, that this is a workaround?

@BenjiU BenjiU merged commit 9de3e74 into flyte:develop Jun 30, 2023
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants