diff --git a/config/pai.conf.example b/config/pai.conf.example index 4ff9c1b..f8a5f69 100644 --- a/config/pai.conf.example +++ b/config/pai.conf.example @@ -39,6 +39,7 @@ import logging # ### Paradox # KEEP_ALIVE_INTERVAL = 10 # Interval between status updates +# IO_TIMEOUT = 0.5 # Timeout for IO operations # # LIMITS = { # By default all zones will be monitored # # 'zone': range(1, 17), # Zones to monitor and control diff --git a/paradox/config.py b/paradox/config.py index e9ef7fc..1428f4a 100644 --- a/paradox/config.py +++ b/paradox/config.py @@ -55,6 +55,7 @@ class Config: "IP_CONNECTION_BARE": False, # IP endpoint connects directly to panel. Used for Serial Tunnels over TCP # Paradox "KEEP_ALIVE_INTERVAL": 10, # Interval between status updates + "IO_TIMEOUT": 0.5, # Timeout for IO operations "LIMITS": {}, # By default all zones will be monitored "LABEL_ENCODING": "paradox-en", # Encoding to use when decoding labels. paradox-* or https://docs.python.org/3/library/codecs.html#standard-encodings "LABEL_REFRESH_INTERVAL": ( diff --git a/paradox/connections/ip/connection.py b/paradox/connections/ip/connection.py index a24ffce..46f8126 100644 --- a/paradox/connections/ip/connection.py +++ b/paradox/connections/ip/connection.py @@ -1,17 +1,15 @@ -# -*- coding: utf-8 -*- - +from abc import ABC, abstractmethod import asyncio import logging -from abc import ABC, abstractmethod from construct import Container +from paradox.config import config as cfg from paradox.connections.connection import Connection from paradox.connections.handler import IPConnectionHandler from paradox.connections.ip.commands import IPModuleConnectCommand from paradox.connections.ip.stun_session import StunSession -from paradox.connections.protocols import (IPConnectionProtocol, - SerialConnectionProtocol) +from paradox.connections.protocols import IPConnectionProtocol, SerialConnectionProtocol from paradox.exceptions import PAICriticalException from paradox.lib.handlers import FutureHandler, HandlerRegistry @@ -94,7 +92,7 @@ def set_key(self, value): def on_ip_message(self, container: Container): return self.loop.create_task(self.ip_handler_registry.handle(container)) - async def wait_for_ip_message(self, timeout=2) -> Container: + async def wait_for_ip_message(self, timeout=cfg.IO_TIMEOUT) -> Container: future = FutureHandler() return await self.ip_handler_registry.wait_until_complete(future, timeout) @@ -107,7 +105,10 @@ def _make_protocol(self): class LocalIPConnection(IPConnectionWithEncryption): def __init__( - self, host, port, password, + self, + host, + port, + password, ): super().__init__(password) self.host = host diff --git a/paradox/connections/protocols.py b/paradox/connections/protocols.py index c6097af..91f1789 100644 --- a/paradox/connections/protocols.py +++ b/paradox/connections/protocols.py @@ -1,12 +1,16 @@ +from abc import abstractmethod import asyncio import binascii import logging -from abc import abstractmethod from paradox.config import config as cfg from paradox.connections.handler import ConnectionHandler, IPConnectionHandler -from paradox.connections.ip.parsers import (IPMessageCommand, IPMessageRequest, - IPMessageResponse, IPMessageType) +from paradox.connections.ip.parsers import ( + IPMessageCommand, + IPMessageRequest, + IPMessageResponse, + IPMessageType, +) logger = logging.getLogger("PAI").getChild(__name__) @@ -52,11 +56,11 @@ async def close(self): if self.transport: try: self.transport.close() - except: + except Exception: logger.exception("Connection transport close raised Exception") self.transport = None - await asyncio.wait_for(self._closed, timeout=1) + await asyncio.wait_for(self._closed, timeout=cfg.IO_TIMEOUT) @abstractmethod def send_message(self, message): @@ -94,7 +98,7 @@ def __del__(self): class SerialConnectionProtocol(ConnectionProtocol): def send_message(self, message): if cfg.LOGGING_DUMP_PACKETS: - logger.debug("PAI -> SER {}".format(binascii.hexlify(message))) + logger.debug(f"PAI -> SER {binascii.hexlify(message)}") self.check_active() @@ -137,7 +141,7 @@ def data_received(self, recv_data): if checksum(frame, min_length): self.buffer = self.buffer[len(frame) :] # Remove message if cfg.LOGGING_DUMP_PACKETS: - logger.debug("SER -> PAI {}".format(binascii.hexlify(frame))) + logger.debug(f"SER -> PAI {binascii.hexlify(frame)}") self.handler.on_message(frame) else: @@ -146,14 +150,14 @@ def data_received(self, recv_data): class IPConnectionProtocol(ConnectionProtocol): def __init__(self, handler: IPConnectionHandler, key): - super(IPConnectionProtocol, self).__init__(handler) + super().__init__(handler) self.handler = handler self.key = key def send_raw(self, raw): if cfg.LOGGING_DUMP_PACKETS: - logger.debug("PAI -> IP (raw) {}".format(binascii.hexlify(raw))) + logger.debug(f"PAI -> IP (raw) {binascii.hexlify(raw)}") self.check_active() @@ -161,7 +165,7 @@ def send_raw(self, raw): def send_message(self, message): if cfg.LOGGING_DUMP_PACKETS: - logger.debug("PAI -> IP (payload) {}".format(binascii.hexlify(message))) + logger.debug(f"PAI -> IP (payload) {binascii.hexlify(message)}") self.check_active() @@ -180,7 +184,7 @@ def send_message(self, message): password=self.key, ) if cfg.LOGGING_DUMP_PACKETS: - logger.debug("PAI -> IP (raw) {}".format(binascii.hexlify(msg))) + logger.debug(f"PAI -> IP (raw) {binascii.hexlify(msg)}") self.transport.write(msg) @@ -188,9 +192,7 @@ def _process_message(self, data): message = IPMessageResponse.parse(data, password=self.key) if cfg.LOGGING_DUMP_PACKETS: - logger.debug( - "IP -> PAI (payload) {}".format(binascii.hexlify(message.payload)) - ) + logger.debug(f"IP -> PAI (payload) {binascii.hexlify(message.payload)}") if message.header.message_type == IPMessageType.serial_passthrough_response: self.handler.on_message(message.payload) @@ -218,7 +220,7 @@ def data_received(self, recv_data): return if cfg.LOGGING_DUMP_PACKETS: - logger.debug("IP -> PAI (raw) {}".format(binascii.hexlify(self.buffer))) + logger.debug(f"IP -> PAI (raw) {binascii.hexlify(self.buffer)}") self._process_message(self.buffer) self.buffer = b"" diff --git a/paradox/lib/async_message_manager.py b/paradox/lib/async_message_manager.py index 76579fd..5089a8c 100644 --- a/paradox/lib/async_message_manager.py +++ b/paradox/lib/async_message_manager.py @@ -4,8 +4,8 @@ from construct import Container -from paradox.lib.handlers import (FutureHandler, HandlerRegistry, - PersistentHandler) +from paradox.config import config as cfg +from paradox.lib.handlers import FutureHandler, HandlerRegistry, PersistentHandler logger = logging.getLogger("PAI").getChild(__name__) @@ -27,7 +27,7 @@ def can_handle(self, data: Container) -> bool: class AsyncMessageManager: def __init__(self, loop=None): - super(AsyncMessageManager, self).__init__() + super().__init__() if not loop: loop = asyncio.get_event_loop() @@ -37,7 +37,9 @@ def __init__(self, loop=None): self.raw_handler_registry = HandlerRegistry() async def wait_for_message( - self, check_fn: Optional[Callable[[Container], bool]] = None, timeout=2 + self, + check_fn: Optional[Callable[[Container], bool]] = None, + timeout=cfg.IO_TIMEOUT, ) -> Container: return await self.handler_registry.wait_until_complete( FutureHandler(check_fn), timeout diff --git a/paradox/lib/handlers.py b/paradox/lib/handlers.py index cda56fe..adcc17b 100644 --- a/paradox/lib/handlers.py +++ b/paradox/lib/handlers.py @@ -1,8 +1,10 @@ +from abc import abstractmethod import asyncio import logging -from abc import abstractmethod from typing import Any, Awaitable, Callable, List, Optional, Union +from paradox.config import config as cfg + logger = logging.getLogger("PAI").getChild(__name__) @@ -30,7 +32,9 @@ class AlreadyHandledError(Exception): class FutureHandler(Handler, asyncio.Future): def __init__( - self, check_fn: Optional[Callable[[Any], bool]] = None, name=None, + self, + check_fn: Optional[Callable[[Any], bool]] = None, + name=None, ): super().__init__() self.name = name if name is not None else self.__class__.__name__ @@ -53,7 +57,7 @@ class PersistentHandler(Handler): def __init__( self, callback: Callable[[Any], Union[None, Awaitable[None]]], name=None ): - super(PersistentHandler, self).__init__(name) + super().__init__(name) self._handle = callback self.persistent = True @@ -90,7 +94,7 @@ def remove_by_name(self, name: str): for handler in to_remove: self.remove(handler) - async def wait_until_complete(self, handler: Handler, timeout=2): + async def wait_until_complete(self, handler: Handler, timeout=cfg.IO_TIMEOUT): self.append(handler) try: return await asyncio.wait_for(handler, timeout=timeout) @@ -112,7 +116,7 @@ async def handle(self, data) -> None: await handler(data) except AlreadyHandledError: logger.error("Already handled") - except: + except Exception: logger.exception("Exception caught during message handling") raise diff --git a/paradox/paradox.py b/paradox/paradox.py index 7ab8b38..488b098 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -1,14 +1,12 @@ -# -*- coding: utf-8 -*- - import asyncio -import logging -import time -import pytz from binascii import hexlify from datetime import datetime -from typing import Callable, Iterable, Optional, Sequence +import logging +import time +from typing import Callable, Iterable, Optional from construct import Container +import pytz from paradox.config import config as cfg, get_limits_for_type from paradox.connections.connection import Connection @@ -16,13 +14,16 @@ from paradox.data.memory_storage import MemoryStorage as Storage from paradox.data.model import DetectedPanel from paradox.event import Change, ChangeEvent, Event, LiveEvent -from paradox.exceptions import (AuthenticationFailed, PanelNotDetected, - StatusRequestException, - async_loop_unhandled_exception_handler, CodeLockout) +from paradox.exceptions import ( + AuthenticationFailed, + CodeLockout, + PanelNotDetected, + StatusRequestException, + async_loop_unhandled_exception_handler, +) from paradox.hardware import Panel, create_panel from paradox.lib import ps -from paradox.lib.async_message_manager import (ErrorMessageHandler, - EventMessageHandler) +from paradox.lib.async_message_manager import ErrorMessageHandler, EventMessageHandler from paradox.lib.handlers import PersistentHandler from paradox.lib.utils import deep_merge from paradox.parsers.status import convert_raw_status @@ -35,7 +36,7 @@ def __init__(self, retries=3): self.panel: Panel = None self._connection: Connection = None self.retries = retries - self.work_loop = asyncio.get_event_loop() # type: asyncio.AbstractEventLoop + self.work_loop = asyncio.get_event_loop() self.work_loop.set_exception_handler(async_loop_unhandled_exception_handler) self.storage = Storage() @@ -70,7 +71,8 @@ def connection(self): from paradox.connections.serial_connection import SerialCommunication self._connection = SerialCommunication( - port=cfg.SERIAL_PORT, baud=cfg.SERIAL_BAUD, + port=cfg.SERIAL_PORT, + baud=cfg.SERIAL_BAUD, ) elif cfg.CONNECTION_TYPE == "IP": logger.info("Using IP Connection") @@ -81,10 +83,7 @@ def connection(self): self._connection = BareIPConnection( host=cfg.IP_CONNECTION_HOST, port=cfg.IP_CONNECTION_PORT ) - elif ( - cfg.IP_CONNECTION_SITEID - and cfg.IP_CONNECTION_EMAIL - ): + elif cfg.IP_CONNECTION_SITEID and cfg.IP_CONNECTION_EMAIL: from paradox.connections.ip.connection import StunIPConnection self._connection = StunIPConnection( @@ -102,9 +101,7 @@ def connection(self): password=cfg.IP_CONNECTION_PASSWORD, ) else: - raise AssertionError( - "Invalid connection type: {}".format(cfg.CONNECTION_TYPE) - ) + raise AssertionError(f"Invalid connection type: {cfg.CONNECTION_TYPE}") self._register_connection_handlers() @@ -140,7 +137,7 @@ async def connect(self) -> bool: initiate_reply = await self.send_wait( self.panel.get_message("InitiateCommunication"), None, - reply_expected=0x7 + reply_expected=0x7, ) if initiate_reply: @@ -156,9 +153,7 @@ async def connect(self) -> bool: initiate_reply.fields.value.serial_number ).decode() - logger.info( - "Panel Identified {} version {}".format(model, firmware_version) - ) + logger.info(f"Panel Identified {model} version {firmware_version}") else: raise ConnectionError("Panel did not replied to InitiateCommunication") @@ -251,7 +246,9 @@ async def sync_time(self): tzinfo = pytz.timezone(cfg.SYNC_TIME_TIMEZONE) now = now.astimezone(tzinfo) except pytz.exceptions.UnknownTimeZoneError: - logger.debug(f"Panel Timezone Unknown ('{cfg.SYNC_TIME_TIMEZONE}'). Skipping sync") + logger.debug( + f"Panel Timezone Unknown ('{cfg.SYNC_TIME_TIMEZONE}'). Skipping sync" + ) return if not self._is_time_sync_required(now.replace(tzinfo=None)): @@ -297,13 +294,13 @@ async def loop(self): logger.error("Lost communication with panel") await self.disconnect() return - except: + except Exception: logger.exception("Loop") finally: self.busy.release() if replies_missing > 0: - logger.debug("Loop: Replies missing: {}".format(replies_missing)) + logger.debug(f"Loop: Replies missing: {replies_missing}") # cfg.Listen for events @@ -353,15 +350,14 @@ def on_connection_message(self, message: bytes): # No message if recv_message is None: logger.debug( - "Unknown message: %s" - % (" ".join("{:02x} ".format(c) for c in message)) + "Unknown message: %s" % (" ".join(f"{c:02x} " for c in message)) ) return self.connection.schedule_message_handling( recv_message ) # schedule handling in the loop - except: + except Exception: logger.exception("Error parsing message") async def send_wait( @@ -370,10 +366,9 @@ async def send_wait( args=None, message=None, retries=5, - timeout=0.5, + timeout=cfg.IO_TIMEOUT, reply_expected=None, ) -> Optional[Container]: - # Connection closed if not self.connection.connected: raise ConnectionError("Not connected") @@ -422,12 +417,12 @@ async def send_wait( except ConnectionError: result = "connection error" raise - except: + except Exception: result = "exception" logger.exception("Unexpected exception during send_wait") raise - #finally: - #logger.debug("send/receive %s in %.4f s", result, time.time() - t1) + finally: + logger.debug("send/receive %s in %.4f s", result, time.time() - t1) attempt += 1 @@ -435,11 +430,9 @@ async def send_wait( async def control_zone(self, zone: str, command: str) -> bool: command = command.lower() - logger.debug("Control Zone: {} - {}".format(zone, command)) + logger.debug(f"Control Zone: {zone} - {command}") - zones_selected = self.storage.get_container("zone").select( - zone - ) # type: Sequence[int] + zones_selected = self.storage.get_container("zone").select(zone) # Not Found if len(zones_selected) == 0: @@ -464,11 +457,9 @@ async def control_zone(self, zone: str, command: str) -> bool: async def control_partition(self, partition: str, command: str) -> bool: command = command.lower() - logger.debug("Control Partition: {} - {}".format(partition, command)) + logger.debug(f"Control Partition: {partition} - {command}") - partitions_selected = self.storage.get_container("partition").select( - partition - ) # type: Sequence[int] + partitions_selected = self.storage.get_container("partition").select(partition) # Not Found if len(partitions_selected) == 0: @@ -493,7 +484,7 @@ async def control_partition(self, partition: str, command: str) -> bool: async def control_output(self, output, command) -> bool: command = command.lower() - logger.debug("Control Output: {} - {}".format(output, command)) + logger.debug(f"Control Output: {output} - {command}") outputs_selected = self.storage.get_container("pgm").select(output) @@ -545,7 +536,7 @@ async def send_panic(self, partition_id, panic_type, user_id) -> bool: async def control_door(self, door, command) -> bool: command = command.lower() - logger.debug("Control Door: {} - {}".format(door, command)) + logger.debug(f"Control Door: {door} - {command}") doors_selected = self.storage.get_container("door").select(door) @@ -585,7 +576,7 @@ def handle_event_message(self, message: Container = None): event_map=self.panel.event_map, label_provider=self.get_label, ) - except AssertionError as e: + except AssertionError: logger.debug("Error creating event") return @@ -633,7 +624,7 @@ def handle_event_message(self, message: Container = None): ps.sendEvent(evt) - except: + except Exception: logger.exception("Handle live event") def handle_error_message(self, message): @@ -644,7 +635,7 @@ def handle_error_message(self, message): asyncio.get_event_loop().create_task(self.disconnect()) else: message = self.panel.get_error_message(error_enum) - logger.error("Got ERROR Message: {}".format(message)) + logger.error(f"Got ERROR Message: {message}") if message == "Invalid PC Password": raise AuthenticationFailed() elif "code lockout" in message: @@ -709,7 +700,13 @@ def _on_status_update(self, status): if element_type in ["troubles"]: # troubles was already parsed continue for element_item_key, element_item_status in element_items.items(): - if isinstance(element_item_status, (dict, list,)): + if isinstance( + element_item_status, + ( + dict, + list, + ), + ): self.storage.update_container_object( element_type, element_item_key, element_item_status ) @@ -746,13 +743,15 @@ def _process_trouble_statuses(self, trouble_statuses): def _is_time_sync_required(self, now) -> bool: assert now.tzinfo is None try: - drift = (now - self.storage.get_container("system")["date"]["time"]).total_seconds() + drift = ( + now - self.storage.get_container("system")["date"]["time"] + ).total_seconds() - if abs(drift) > cfg.SYNC_TIME_MIN_DRIFT: - logger.info(f"Time drifted more than allowed: {drift} seconds") - return True - else: - logger.debug(f"Time drifted within allowed range: {drift} seconds") + if abs(drift) > cfg.SYNC_TIME_MIN_DRIFT: + logger.info(f"Time drifted more than allowed: {drift} seconds") + return True + else: + logger.debug(f"Time drifted within allowed range: {drift} seconds") except KeyError: pass @@ -795,7 +794,7 @@ def _update_partition_states(self): def _on_event(self, event: Event): if cfg.LOGGING_DUMP_EVENTS: - logger.debug("LiveEvent: {}".format(event)) + logger.debug(f"LiveEvent: {event}") event.call_hook(storage=self.storage, alarm=self) @@ -813,7 +812,7 @@ def _on_property_change(self, change: Change): label_provider=self.get_label, ) if cfg.LOGGING_DUMP_EVENTS: - logger.debug("ChangeEvent: {}".format(event)) + logger.debug(f"ChangeEvent: {event}") ps.sendEvent(event) except AssertionError: logger.debug("Could not create event from change")