Skip to content

Commit

Permalink
Bugfixes (#13)
Browse files Browse the repository at this point in the history
* fix: some tests

* fix: potential bug with command task

* feat: centralize writing

* fix: tests

* feat: get signal info

* fix: clear command flag on connect

* fix: update version
  • Loading branch information
iloveicedgreentea authored Sep 2, 2024
1 parent 8a7bb94 commit 0eb5295
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 138 deletions.
2 changes: 1 addition & 1 deletion madvr/consts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Constants for madvr module."""

REFRESH_TIME = 60
REFRESH_TIME = 20
PING_DELAY = 30
COMMAND_TIMEOUT = 3
PING_INTERVAL = 5
Expand Down
128 changes: 68 additions & 60 deletions madvr/madvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,21 @@ async def task_handle_queue(self) -> None:
"""Handle command queue."""
while True:
await self.connection_event.wait()
while (
not self.command_queue.empty() and not self.stop_commands_flag.is_set()
):
while not self.command_queue.empty() and not self.stop_commands_flag.is_set():
command = await self.command_queue.get()
self.logger.debug("sending queue command %s", command)
try:
await self.send_command(command)
except NotImplementedError as err:
self.logger.warning("Command not implemented: %s", err)
except (ConnectionError, ConnectionResetError):
self.logger.warning("Envy was turned off manually")
# update state that its off
await self._handle_power_off()
except (ConnectionError, ConnectionResetError, BrokenPipeError):
self.logger.warning("Task Queue: Envy seems to be disconnected")
except AttributeError:
self.logger.warning("Issue sending command from queue")
except RetryExceededError:
self.logger.warning("Retry exceeded for command %s", command)
except OSError as err:
self.logger.error("Unexpected error when sending command: %s", err)
finally:
self.command_queue.task_done()

if self.stop_commands_flag.is_set():
self.clear_queue()
Expand Down Expand Up @@ -197,9 +191,7 @@ async def task_read_notifications(self) -> None:
# try to connect otherwise it will mark the device as offline
await self._reconnect()
except ConnectionError as e:
self.logger.error(
"Connection error when reading notifications: %s", e
)
self.logger.error("Connection error when reading notifications: %s", e)
continue

await asyncio.sleep(TASK_CPU_DELAY)
Expand All @@ -213,15 +205,7 @@ async def send_heartbeat(self, once: bool = False) -> None:
"""

async def perform_heartbeat() -> None:
if not self.connected:
self.logger.warning("Connection not established")
raise HeartBeatError("Connection not established")

async with self.lock:
if self.writer:
self.writer.write(self.HEARTBEAT)
await self.writer.drain()
self.logger.debug("Heartbeat complete")
await self._write_with_timeout(self.HEARTBEAT)

async def handle_heartbeat_error(
err: TimeoutError | OSError | HeartBeatError,
Expand Down Expand Up @@ -270,9 +254,7 @@ async def task_ping_until_alive(self) -> None:
try:
await self.open_connection()
except ConnectionError as err:
self.logger.error(
"Error opening connection after connectivity check: %s", err
)
self.logger.error("Error opening connection after connectivity check: %s", err)
else:
self.logger.debug(
"Device is not connectable, retrying in %s seconds",
Expand All @@ -285,13 +267,18 @@ async def task_ping_until_alive(self) -> None:
await asyncio.sleep(self.ping_interval)

async def task_refresh_info(self) -> None:
"""Task to refresh some device info every minute"""
"""Task to refresh some device info every 20s"""
while True:
# wait until the connection is established
await self.connection_event.wait()
cmds = [
["GetMacAddress"],
["GetTemperatures"],
# get signal info in case a change was missed and its sitting in limbo
["GetIncomingSignalInfo"],
["GetOutgoingSignalInfo"],
["GetAspectRatio"],
["GetMaskingRatio"],
]
for cmd in cmds:
await self.add_command_to_queue(cmd)
Expand All @@ -317,6 +304,32 @@ def stop(self) -> None:
self.stop_heartbeat.set()
self.stop_commands_flag.set()

async def _write_with_timeout(self, data: bytes) -> None:
"""Write data to the socket with a timeout."""
if not self.connected:
self.logger.error("Connection not established. Reconnecting")
await self._reconnect()

if not self.writer:
self.logger.error("Writer is not initialized. Reconnecting")
await self._reconnect()

async def write_and_drain() -> None:
if not self.writer:
raise ConnectionError("Writer is not initialized")
self.writer.write(data)
await self.writer.drain()

try:
async with self.lock:
await asyncio.wait_for(write_and_drain(), timeout=self.connect_timeout)
except TimeoutError:
self.logger.error("Write operation timed out after %s seconds", self.connect_timeout)
await self._reconnect()
except (ConnectionResetError, OSError) as err:
self.logger.error("Error writing to socket: %s", err)
await self._reconnect()

async def _reconnect(self) -> None:
"""
Initiate a persistent connection to the device.
Expand All @@ -325,10 +338,10 @@ async def _reconnect(self) -> None:
"""
# it will not try to connect until ping is successful
if await self.is_device_connectable():
self.logger.info("Device is online")
self.logger.debug("Device is online")

try:
self.logger.info("Connecting to Envy: %s:%s", self.host, self.port)
self.logger.debug("Connecting to Envy: %s:%s", self.host, self.port)

# Command client
self.reader, self.writer = await asyncio.wait_for(
Expand All @@ -343,14 +356,14 @@ async def _reconnect(self) -> None:
await self._set_connected(True)
self.stop_heartbeat.clear()
# send a heartbeat now
self.logger.debug("Sending heartbeat")
await self.send_heartbeat(once=True)

self.logger.info("Connection established")
self.stop_commands_flag.clear()

except (TimeoutError, HeartBeatError, OSError) as err:
self.logger.error(
"Heartbeat failed. Connection not established %s", err
)
self.logger.error("Heartbeat failed. Connection not established %s", err)
await self._set_connected(False)
raise ConnectionError("Heartbeat failed") from err
else:
Expand All @@ -359,15 +372,22 @@ async def _reconnect(self) -> None:
await self._handle_power_off()

async def is_device_connectable(self) -> bool:
"""Check if the device is connectable without ping."""
try:
async with asyncio.timeout(SMALL_DELAY):
_, writer = await asyncio.open_connection(self.host, self.port)
writer.close()
await writer.wait_closed()
return True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return False
"""Check if the device is connectable without ping. The device is only connectable when on."""
retry_count = 0
# loop because upgrading firmware can take a few seconds and will kill the connection
while retry_count < 10:
try:
async with asyncio.timeout(SMALL_DELAY):
_, writer = await asyncio.open_connection(self.host, self.port)
writer.close()
await writer.wait_closed()
return True
except (TimeoutError, ConnectionRefusedError, OSError):
await asyncio.sleep(SMALL_DELAY)
retry_count += 1
continue
self.logger.debug("Device is not connectable")
return False

async def _clear_attr(self) -> None:
"""
Expand Down Expand Up @@ -400,7 +420,7 @@ async def open_connection(self) -> None:
self.logger.debug("Connection opened")
except (AckError, ConnectionError) as err:
self.logger.error("Error opening connection: %s", err)
raise ConnectionError("Error opening connection") from err
raise

# once connected, try to refresh data once in the case the device was turned connected to while on already
cmds = [
Expand Down Expand Up @@ -441,9 +461,7 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]:
bytes: the value to send in bytes
str: the 'msg' field in the Enum used to filter notifications
"""
self.logger.debug(
"raw_command: %s -- raw_command length: %s", raw_command, len(raw_command)
)
self.logger.debug("raw_command: %s -- raw_command length: %s", raw_command, len(raw_command))
skip_val = False
# HA seems to always send commands as a list even if you set them as a str

Expand Down Expand Up @@ -500,9 +518,7 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]:
cmd = command_base + Footer.footer.value

except KeyError as exc:
raise NotImplementedError(
"Incorrect parameter given for command"
) from exc
raise NotImplementedError("Incorrect parameter given for command") from exc
else:
cmd = command_name + Footer.footer.value

Expand All @@ -529,15 +545,8 @@ async def send_command(self, command: list) -> None:

self.logger.debug("Using values: %s %s", cmd, enum_type)

if not self.connected:
self.logger.error("Connection not established")
raise ConnectionError("Device not connected")

try:
async with self.lock:
if self.writer:
self.writer.write(cmd)
await self.writer.drain()
await self._write_with_timeout(cmd)
except (ConnectionResetError, TimeoutError, OSError) as err:
self.logger.error("Error writing command to socket: %s", err)
raise ConnectionError("Failed to send command") from err
Expand Down Expand Up @@ -574,8 +583,6 @@ async def power_on(self, mac: str = "") -> None:
"""
Power on the device
"""
# start processing commands
self.stop_commands_flag.clear()

# use the detected mac or one that is supplied at init or function call
mac_to_use = self.mac_address or self.mac or mac
Expand All @@ -585,9 +592,7 @@ async def power_on(self, mac: str = "") -> None:
send_magic_packet(mac_to_use, logger=self.logger)
else:
# without wol, you cant power on the device
self.logger.warning(
"No mac provided, no action to take. Implement your own WOL automation"
)
self.logger.warning("No mac provided, no action to take. Implement your own WOL automation")

async def power_off(self, standby: bool = False) -> None:
"""
Expand All @@ -599,6 +604,9 @@ async def power_off(self, standby: bool = False) -> None:
# set the flag to delay the ping task to avoid race conditions
self.powered_off_recently = True
if self.connected:
await self.send_command(["Standby"] if standby else ["PowerOff"])
try:
await self.send_command(["Standby"] if standby else ["PowerOff"])
except ConnectionError as err:
self.logger.error("Error sending power off command: %s", err)

await self.close_connection() #
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pre-commit
mypy
ruff
pydantic
pre-commit
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="py_madvr2",
version="1.6.29",
version="1.6.32",
author="iloveicedgreentea2",
description="A package to control MadVR Envy over IP",
long_description=long_description,
Expand Down
53 changes: 53 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# type: ignore
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch

import pytest

from madvr.madvr import Madvr


@pytest.fixture
def mock_madvr():
with patch("madvr.madvr.asyncio.open_connection", new_callable=AsyncMock), patch(
"madvr.madvr.Madvr.connected", new_callable=PropertyMock, return_value=True
):
madvr = Madvr("192.168.1.100")
# ignore mypy
#
madvr.writer = AsyncMock()
madvr.reader = AsyncMock()
madvr._set_connected = AsyncMock()
madvr._clear_attr = AsyncMock()
madvr.is_device_connectable = AsyncMock()
madvr.close_connection = AsyncMock()
madvr._construct_command = AsyncMock()
madvr._write_with_timeout = AsyncMock()
madvr.stop = MagicMock()
madvr.stop_commands_flag = MagicMock()
madvr.stop_heartbeat = MagicMock()
madvr.add_command_to_queue = AsyncMock()
madvr._reconnect = AsyncMock()
madvr._write_with_timeout = AsyncMock()

# Mock the background tasks to prevent warnings
madvr.task_handle_queue = AsyncMock()
madvr.task_read_notifications = AsyncMock()
# madvr.send_heartbeat = AsyncMock()
madvr.task_ping_until_alive = AsyncMock()
madvr.task_refresh_info = AsyncMock()
yield madvr


@pytest.fixture
def mock_send_magic_packet():
with patch("madvr.madvr.send_magic_packet") as mock:
yield mock


@pytest.fixture
def mock_wait_for():
async def mock_wait_for_func(coro, timeout):
return await coro

with patch("asyncio.wait_for", mock_wait_for_func):
yield
Loading

0 comments on commit 0eb5295

Please sign in to comment.