From 11a06ce0dafd3d13c5b54961f9e0019539eac3f7 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Sat, 27 Apr 2024 19:13:22 -0700 Subject: [PATCH] Syndic fix-up - Update _handle_decode_payload methods to use async/await. - Syndic closes request channels before creating new ones. --- salt/minion.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 3d48378a1ac9..3f52e9672b54 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1728,8 +1728,7 @@ def handle_timeout(*_): # pylint: enable=unexpected-keyword-arg return True - @tornado.gen.coroutine - def _handle_decoded_payload(self, data): + async def _handle_decoded_payload(self, data): """ Override this method if you wish to handle the decoded data differently. @@ -1786,7 +1785,7 @@ def _handle_decoded_payload(self, data): " waiting...", data["jid"], ) - yield tornado.gen.sleep(10) + await asyncio.sleep(10) process_count = len(salt.utils.minion.running(self.opts)) # We stash an instance references to allow for the socket @@ -3323,8 +3322,9 @@ def __init__(self, opts, **kwargs): self.jids = {} self.raw_events = [] self.pub_future = None + self.async_req_channel = None - def _handle_decoded_payload(self, data): + async def _handle_decoded_payload(self, data): """ Override this method if you wish to handle the decoded data differently. @@ -3366,6 +3366,7 @@ def timeout_handler(*args): data["jid"], data["to"], io_loop=self.io_loop, + listen=False, **kwargs, ) @@ -3417,6 +3418,11 @@ def tune_in_no_block(self): management of the event bus assuming that these are handled outside the tune_in sequence """ + if self.req_channel: + self.req_channel.close() + if self.async_req_channel: + self.async_req_channel.close() + # Instantiate the local client self.local = salt.client.get_local_client( self.opts["_minion_conf_file"], io_loop=self.io_loop @@ -3427,10 +3433,10 @@ def tune_in_no_block(self): self.req_channel = salt.channel.client.ReqChannel.factory(self.opts) self.async_req_channel = salt.channel.client.AsyncReqChannel.factory(self.opts) - def _process_cmd_socket(self, payload): + async def _process_cmd_socket(self, payload): if payload is not None and payload["enc"] == "aes": log.trace("Handling payload") - self._handle_decoded_payload(payload["load"]) + await self._handle_decoded_payload(payload["load"]) # If it's not AES, and thus has not been verified, we do nothing. # In the future, we could add support for some clearfuncs, but # the syndic currently has no need. @@ -3918,8 +3924,7 @@ async def _handle_payload(self, payload): mp_call = _metaproxy_call(self.opts, "handle_payload") return mp_call(self, payload) - @tornado.gen.coroutine - def _handle_decoded_payload(self, data): + async def _handle_decoded_payload(self, data): mp_call = _metaproxy_call(self.opts, "handle_decoded_payload") return mp_call(self, data)