Skip to content

Commit

Permalink
chore: typing e2 (midea-lan#65)
Browse files Browse the repository at this point in the history
Draft as there is a logic issue with "old_protocol"

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Refactor**
- Enhanced code clarity and type safety by adding type hints to various
methods and instance variables across multiple modules.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
chemelli74 authored Jun 5, 2024
1 parent b0337ba commit faf7a38
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
16 changes: 9 additions & 7 deletions midealocal/devices/e2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import sys
from typing import Any

from .message import (
MessageE2Response,
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(
model: str,
subtype: int,
customize: str,
):
) -> None:
super().__init__(
name=name,
device_id=device_id,
Expand Down Expand Up @@ -77,13 +78,13 @@ def __init__(
self._old_protocol = self._default_old_protocol
self.set_customize(customize)

def old_protocol(self):
def old_protocol(self) -> bool:
return self.subtype <= 82 or self.subtype == 85 or self.subtype == 36353

def build_query(self):
def build_query(self) -> list[MessageQuery]:
return [MessageQuery(self._protocol_version)]

def process_message(self, msg):
def process_message(self, msg: bytes) -> dict[str, Any]:
message = MessageE2Response(msg)
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
new_status = {}
Expand All @@ -93,7 +94,7 @@ def process_message(self, msg):
new_status[str(status)] = getattr(message, str(status))
return new_status

def make_message_set(self):
def make_message_set(self) -> MessageSet:
message = MessageSet(self._protocol_version)
message.protection = self._attributes[DeviceAttributes.protection]
message.whole_tank_heating = self._attributes[
Expand All @@ -105,7 +106,8 @@ def make_message_set(self):
message.variable_heating = self._attributes[DeviceAttributes.variable_heating]
return message

def set_attribute(self, attr, value):
def set_attribute(self, attr: str, value: Any) -> None:
message: MessagePower | MessageSet | MessageNewProtocolSet | None = None
if attr not in [
DeviceAttributes.heating,
DeviceAttributes.keep_warm,
Expand All @@ -126,7 +128,7 @@ def set_attribute(self, attr, value):
setattr(message, str(attr), value)
self.build_send(message)

def set_customize(self, customize):
def set_customize(self, customize: str) -> None:
self._old_protocol = self._default_old_protocol
if customize and len(customize) > 0:
try:
Expand Down
34 changes: 18 additions & 16 deletions midealocal/devices/e2/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


class MessageE2Base(MessageRequest):
def __init__(self, protocol_version, message_type, body_type):
def __init__(
self, protocol_version: int, message_type: int, body_type: int
) -> None:
super().__init__(
device_type=0xE2,
protocol_version=protocol_version,
Expand All @@ -16,25 +18,25 @@ def __init__(self, protocol_version, message_type, body_type):
)

@property
def _body(self):
def _body(self) -> bytearray:
raise NotImplementedError


class MessageQuery(MessageE2Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.query,
body_type=0x01,
)

@property
def _body(self):
def _body(self) -> bytearray:
return bytearray([0x01])


class MessagePower(MessageE2Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
Expand All @@ -43,7 +45,7 @@ def __init__(self, protocol_version):
self.power = False

@property
def _body(self):
def _body(self) -> bytearray:
if self.power:
self.body_type = 0x01
else:
Expand All @@ -52,18 +54,18 @@ def _body(self):


class MessageNewProtocolSet(MessageE2Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
body_type=0x14,
)
self.target_temperature = None
self.variable_heating = None
self.whole_tank_heating = None
self.target_temperature: int | None = None
self.variable_heating: bool | None = None
self.whole_tank_heating: bool | None = None

@property
def _body(self):
def _body(self) -> bytearray:
byte1 = 0x00
byte2 = 0x00
if self.target_temperature is not None:
Expand All @@ -79,7 +81,7 @@ def _body(self):


class MessageSet(MessageE2Base):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
Expand All @@ -91,7 +93,7 @@ def __init__(self, protocol_version):
self.protection = False

@property
def _body(self):
def _body(self) -> bytearray:
# Byte 4 whole_tank_heating, protection
protection = 0x04 if self.protection else 0x00
whole_tank_heating = 0x02 if self.whole_tank_heating else 0x01
Expand Down Expand Up @@ -124,7 +126,7 @@ def _body(self):


class E2GeneralMessageBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
self.power = (body[2] & 0x01) > 0
self.heating = (body[2] & 0x04) > 0
Expand All @@ -142,8 +144,8 @@ def __init__(self, body):


class MessageE2Response(MessageResponse):
def __init__(self, message):
super().__init__(message)
def __init__(self, message: bytes) -> None:
super().__init__(bytearray(message))
if (
self.message_type in [MessageType.query, MessageType.notify1]
and self.body_type == 0x01
Expand Down

0 comments on commit faf7a38

Please sign in to comment.