Skip to content

Commit

Permalink
Syndic fix-up
Browse files Browse the repository at this point in the history
- Update _handle_decode_payload methods to use async/await.
- Syndic closes request channels before creating new ones.
  • Loading branch information
dwoz authored and s0undt3ch committed May 1, 2024
1 parent 2240c08 commit 11a06ce
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,8 +1728,7 @@ def handle_timeout(*_):
# pylint: enable=unexpected-keyword-arg
return True

@tornado.gen.coroutine
def _handle_decoded_payload(self, data):
async def _handle_decoded_payload(self, data):
"""
Override this method if you wish to handle the decoded data
differently.
Expand Down Expand Up @@ -1786,7 +1785,7 @@ def _handle_decoded_payload(self, data):
" waiting...",
data["jid"],
)
yield tornado.gen.sleep(10)
await asyncio.sleep(10)
process_count = len(salt.utils.minion.running(self.opts))

# We stash an instance references to allow for the socket
Expand Down Expand Up @@ -3323,8 +3322,9 @@ def __init__(self, opts, **kwargs):
self.jids = {}
self.raw_events = []
self.pub_future = None
self.async_req_channel = None

def _handle_decoded_payload(self, data):
async def _handle_decoded_payload(self, data):
"""
Override this method if you wish to handle the decoded data
differently.
Expand Down Expand Up @@ -3366,6 +3366,7 @@ def timeout_handler(*args):
data["jid"],
data["to"],
io_loop=self.io_loop,
listen=False,
**kwargs,
)

Expand Down Expand Up @@ -3417,6 +3418,11 @@ def tune_in_no_block(self):
management of the event bus assuming that these are handled outside
the tune_in sequence
"""
if self.req_channel:
self.req_channel.close()
if self.async_req_channel:
self.async_req_channel.close()

# Instantiate the local client
self.local = salt.client.get_local_client(
self.opts["_minion_conf_file"], io_loop=self.io_loop
Expand All @@ -3427,10 +3433,10 @@ def tune_in_no_block(self):
self.req_channel = salt.channel.client.ReqChannel.factory(self.opts)
self.async_req_channel = salt.channel.client.AsyncReqChannel.factory(self.opts)

def _process_cmd_socket(self, payload):
async def _process_cmd_socket(self, payload):
if payload is not None and payload["enc"] == "aes":
log.trace("Handling payload")
self._handle_decoded_payload(payload["load"])
await self._handle_decoded_payload(payload["load"])
# If it's not AES, and thus has not been verified, we do nothing.
# In the future, we could add support for some clearfuncs, but
# the syndic currently has no need.
Expand Down Expand Up @@ -3918,8 +3924,7 @@ async def _handle_payload(self, payload):
mp_call = _metaproxy_call(self.opts, "handle_payload")
return mp_call(self, payload)

@tornado.gen.coroutine
def _handle_decoded_payload(self, data):
async def _handle_decoded_payload(self, data):
mp_call = _metaproxy_call(self.opts, "handle_decoded_payload")
return mp_call(self, data)

Expand Down

0 comments on commit 11a06ce

Please sign in to comment.