Skip to content

Commit

Permalink
Configurable IO_TIMEOUT
Browse files Browse the repository at this point in the history
  • Loading branch information
yozik04 committed Mar 30, 2024
1 parent 9dbe5bc commit 1039f33
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 88 deletions.
1 change: 1 addition & 0 deletions config/pai.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import logging
#
### Paradox
# KEEP_ALIVE_INTERVAL = 10 # Interval between status updates
# IO_TIMEOUT = 0.5 # Timeout for IO operations
#
# LIMITS = { # By default all zones will be monitored
# # 'zone': range(1, 17), # Zones to monitor and control
Expand Down
1 change: 1 addition & 0 deletions paradox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Config:
"IP_CONNECTION_BARE": False, # IP endpoint connects directly to panel. Used for Serial Tunnels over TCP
# Paradox
"KEEP_ALIVE_INTERVAL": 10, # Interval between status updates
"IO_TIMEOUT": 0.5, # Timeout for IO operations
"LIMITS": {}, # By default all zones will be monitored
"LABEL_ENCODING": "paradox-en", # Encoding to use when decoding labels. paradox-* or https://docs.python.org/3/library/codecs.html#standard-encodings
"LABEL_REFRESH_INTERVAL": (
Expand Down
15 changes: 8 additions & 7 deletions paradox/connections/ip/connection.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
# -*- coding: utf-8 -*-

from abc import ABC, abstractmethod
import asyncio
import logging
from abc import ABC, abstractmethod

from construct import Container

from paradox.config import config as cfg
from paradox.connections.connection import Connection
from paradox.connections.handler import IPConnectionHandler
from paradox.connections.ip.commands import IPModuleConnectCommand
from paradox.connections.ip.stun_session import StunSession
from paradox.connections.protocols import (IPConnectionProtocol,
SerialConnectionProtocol)
from paradox.connections.protocols import IPConnectionProtocol, SerialConnectionProtocol
from paradox.exceptions import PAICriticalException
from paradox.lib.handlers import FutureHandler, HandlerRegistry

Expand Down Expand Up @@ -94,7 +92,7 @@ def set_key(self, value):
def on_ip_message(self, container: Container):
return self.loop.create_task(self.ip_handler_registry.handle(container))

async def wait_for_ip_message(self, timeout=2) -> Container:
async def wait_for_ip_message(self, timeout=cfg.IO_TIMEOUT) -> Container:
future = FutureHandler()
return await self.ip_handler_registry.wait_until_complete(future, timeout)

Expand All @@ -107,7 +105,10 @@ def _make_protocol(self):

class LocalIPConnection(IPConnectionWithEncryption):
def __init__(
self, host, port, password,
self,
host,
port,
password,
):
super().__init__(password)
self.host = host
Expand Down
32 changes: 17 additions & 15 deletions paradox/connections/protocols.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from abc import abstractmethod
import asyncio
import binascii
import logging
from abc import abstractmethod

from paradox.config import config as cfg
from paradox.connections.handler import ConnectionHandler, IPConnectionHandler
from paradox.connections.ip.parsers import (IPMessageCommand, IPMessageRequest,
IPMessageResponse, IPMessageType)
from paradox.connections.ip.parsers import (
IPMessageCommand,
IPMessageRequest,
IPMessageResponse,
IPMessageType,
)

logger = logging.getLogger("PAI").getChild(__name__)

Expand Down Expand Up @@ -52,11 +56,11 @@ async def close(self):
if self.transport:
try:
self.transport.close()
except:
except Exception:
logger.exception("Connection transport close raised Exception")
self.transport = None

await asyncio.wait_for(self._closed, timeout=1)
await asyncio.wait_for(self._closed, timeout=cfg.IO_TIMEOUT)

@abstractmethod
def send_message(self, message):
Expand Down Expand Up @@ -94,7 +98,7 @@ def __del__(self):
class SerialConnectionProtocol(ConnectionProtocol):
def send_message(self, message):
if cfg.LOGGING_DUMP_PACKETS:
logger.debug("PAI -> SER {}".format(binascii.hexlify(message)))
logger.debug(f"PAI -> SER {binascii.hexlify(message)}")

self.check_active()

Expand Down Expand Up @@ -137,7 +141,7 @@ def data_received(self, recv_data):
if checksum(frame, min_length):
self.buffer = self.buffer[len(frame) :] # Remove message
if cfg.LOGGING_DUMP_PACKETS:
logger.debug("SER -> PAI {}".format(binascii.hexlify(frame)))
logger.debug(f"SER -> PAI {binascii.hexlify(frame)}")

self.handler.on_message(frame)
else:
Expand All @@ -146,22 +150,22 @@ def data_received(self, recv_data):

class IPConnectionProtocol(ConnectionProtocol):
def __init__(self, handler: IPConnectionHandler, key):
super(IPConnectionProtocol, self).__init__(handler)
super().__init__(handler)

self.handler = handler
self.key = key

def send_raw(self, raw):
if cfg.LOGGING_DUMP_PACKETS:
logger.debug("PAI -> IP (raw) {}".format(binascii.hexlify(raw)))
logger.debug(f"PAI -> IP (raw) {binascii.hexlify(raw)}")

self.check_active()

self.transport.write(raw)

def send_message(self, message):
if cfg.LOGGING_DUMP_PACKETS:
logger.debug("PAI -> IP (payload) {}".format(binascii.hexlify(message)))
logger.debug(f"PAI -> IP (payload) {binascii.hexlify(message)}")

self.check_active()

Expand All @@ -180,17 +184,15 @@ def send_message(self, message):
password=self.key,
)
if cfg.LOGGING_DUMP_PACKETS:
logger.debug("PAI -> IP (raw) {}".format(binascii.hexlify(msg)))
logger.debug(f"PAI -> IP (raw) {binascii.hexlify(msg)}")

self.transport.write(msg)

def _process_message(self, data):
message = IPMessageResponse.parse(data, password=self.key)

if cfg.LOGGING_DUMP_PACKETS:
logger.debug(
"IP -> PAI (payload) {}".format(binascii.hexlify(message.payload))
)
logger.debug(f"IP -> PAI (payload) {binascii.hexlify(message.payload)}")

if message.header.message_type == IPMessageType.serial_passthrough_response:
self.handler.on_message(message.payload)
Expand Down Expand Up @@ -218,7 +220,7 @@ def data_received(self, recv_data):
return

if cfg.LOGGING_DUMP_PACKETS:
logger.debug("IP -> PAI (raw) {}".format(binascii.hexlify(self.buffer)))
logger.debug(f"IP -> PAI (raw) {binascii.hexlify(self.buffer)}")

self._process_message(self.buffer)
self.buffer = b""
10 changes: 6 additions & 4 deletions paradox/lib/async_message_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from construct import Container

from paradox.lib.handlers import (FutureHandler, HandlerRegistry,
PersistentHandler)
from paradox.config import config as cfg
from paradox.lib.handlers import FutureHandler, HandlerRegistry, PersistentHandler

logger = logging.getLogger("PAI").getChild(__name__)

Expand All @@ -27,7 +27,7 @@ def can_handle(self, data: Container) -> bool:

class AsyncMessageManager:
def __init__(self, loop=None):
super(AsyncMessageManager, self).__init__()
super().__init__()

if not loop:
loop = asyncio.get_event_loop()
Expand All @@ -37,7 +37,9 @@ def __init__(self, loop=None):
self.raw_handler_registry = HandlerRegistry()

async def wait_for_message(
self, check_fn: Optional[Callable[[Container], bool]] = None, timeout=2
self,
check_fn: Optional[Callable[[Container], bool]] = None,
timeout=cfg.IO_TIMEOUT,
) -> Container:
return await self.handler_registry.wait_until_complete(
FutureHandler(check_fn), timeout
Expand Down
14 changes: 9 additions & 5 deletions paradox/lib/handlers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from abc import abstractmethod
import asyncio
import logging
from abc import abstractmethod
from typing import Any, Awaitable, Callable, List, Optional, Union

from paradox.config import config as cfg

logger = logging.getLogger("PAI").getChild(__name__)


Expand Down Expand Up @@ -30,7 +32,9 @@ class AlreadyHandledError(Exception):

class FutureHandler(Handler, asyncio.Future):
def __init__(
self, check_fn: Optional[Callable[[Any], bool]] = None, name=None,
self,
check_fn: Optional[Callable[[Any], bool]] = None,
name=None,
):
super().__init__()
self.name = name if name is not None else self.__class__.__name__
Expand All @@ -53,7 +57,7 @@ class PersistentHandler(Handler):
def __init__(
self, callback: Callable[[Any], Union[None, Awaitable[None]]], name=None
):
super(PersistentHandler, self).__init__(name)
super().__init__(name)
self._handle = callback
self.persistent = True

Expand Down Expand Up @@ -90,7 +94,7 @@ def remove_by_name(self, name: str):
for handler in to_remove:
self.remove(handler)

async def wait_until_complete(self, handler: Handler, timeout=2):
async def wait_until_complete(self, handler: Handler, timeout=cfg.IO_TIMEOUT):
self.append(handler)
try:
return await asyncio.wait_for(handler, timeout=timeout)
Expand All @@ -112,7 +116,7 @@ async def handle(self, data) -> None:
await handler(data)
except AlreadyHandledError:
logger.error("Already handled")
except:
except Exception:
logger.exception("Exception caught during message handling")
raise

Expand Down
Loading

0 comments on commit 1039f33

Please sign in to comment.