Skip to content

Commit

Permalink
Implement keepalive loop as a coro to fix issue #282 (#315)
Browse files Browse the repository at this point in the history
* Implement keepalive loop as a coro to fix issue #282
  • Loading branch information
jmoutte authored Jun 30, 2023
1 parent 7367dd1 commit 9de3e74
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions mqtt_io/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,35 @@ async def _mqtt_task_loop(self) -> None:
_LOG.exception("Exception while handling MQTT task:")
self.mqtt_task_queue.task_done()

async def _mqtt_keep_alive_loop(self) -> None:
"""
Publish a message on the status topic regularly to keep the connection alive.
This is a workaround for 2 different problems:
- keepalive was not implemented in early versions of asyncio_mqtt
- due to the way we receive messages asynchronously in our own queue we are not able
to capture the disconnected exception and trigger the reconnection. This publish call
will trigger the exception if the connection was lost and reconnect. (Issue #282)
"""
config: ConfigType = self.config["mqtt"]
topic_prefix: str = config["topic_prefix"]
if not self.mqtt_connected.is_set():
_LOG.debug("_mqtt_keep_alive_loop awaiting MQTT connection")
await self.mqtt_connected.wait()
_LOG.debug("_mqtt_keep_alive_loop unblocked after MQTT connection")
while True:
if self.mqtt is None:
_LOG.error("Attempted to ping MQTT server before client initialised")
while self.mqtt is None:
await asyncio.sleep(1)
continue
await self.mqtt.publish(MQTTMessageSend(
"/".join((topic_prefix, config["status_topic"])),
config["status_payload_running"].encode("utf8"),
qos=1,
retain=True,
))
await asyncio.sleep(config["keepalive"])

async def _mqtt_rx_loop(self) -> None:
if not self.mqtt_connected.is_set():
_LOG.debug("_mqtt_rx_loop awaiting MQTT connection")
Expand Down Expand Up @@ -1175,6 +1204,7 @@ async def _main_loop(self) -> None:
for coro in (
self._mqtt_task_loop(),
self._mqtt_rx_loop(),
self._mqtt_keep_alive_loop(),
self._remove_finished_transient_tasks(),
)
]
Expand Down

0 comments on commit 9de3e74

Please sign in to comment.