Skip to content

Commit

Permalink
Wip on segments and scenes
Browse files Browse the repository at this point in the history
  • Loading branch information
Galorhallen committed Oct 17, 2024
1 parent 5bbf459 commit 82ac1cb
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 56 deletions.
39 changes: 18 additions & 21 deletions example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ def discovered_callback(device: GoveeDevice, is_new: bool) -> bool:
return True


async def print_status(controller: GoveeController):
async def print_status(controller: GoveeController, device: GoveeDevice):
while True:
if not controller.devices:
print("No devices found")
for device in controller.devices:
print(f"Status: {device}")
await asyncio.sleep(5)
if not device or not device.capabilities:
continue
for seg in range(1, 1 + device.capabilities.segments_count):
await device.set_segment_color(seg, (255, 0, 0))
await asyncio.sleep(0.1)
for seg in range(1, 1 + device.capabilities.segments_count):
await device.set_segment_color(seg, (0, 0, 255))
await asyncio.sleep(0.1)
await asyncio.sleep(1)


async def main(controller: GoveeController):
Expand All @@ -29,22 +33,15 @@ async def main(controller: GoveeController):
await asyncio.sleep(1)
print("Waited")

devices = [controller.get_device_by_ip("10.0.0.110")]
# devices = controller.devices
for device in devices:
if not device:
continue
await device.turn_on()
await asyncio.sleep(1)
await device.set_segment_color(1, (255, 0, 0))
# await device.set_segment_color(14, (0, 255, 0))
# await device.set_segment_color(13, (0, 0, 255))
# await device.set_segment_color(12, (255, 255, 0))
# await device.set_segment_color(11, (0, 255, 255))
# await device.set_segment_color(10, (255, 0, 255))
# await device.set_segment_color(9, (0, 0, 0))
device = controller.get_device_by_ip("10.0.0.110")
if not device:
print("Device not found")
await device.turn_on()
await device.set_rgb_color(255, 255, 255)
await device.set_brightness(100)
await asyncio.sleep(1)

await print_status(controller)
await print_status(controller, device)


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions src/govee_local_api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ async def turn_on_off(self, device: GoveeDevice, status: bool) -> None:
async def set_segment_color(
self, device: GoveeDevice, segment: int, rgb: Tuple[int, int, int]
) -> None:
if not device.capabilities:
self._logger.warning("Capabilities not available for device %s", device)
return

if GoveeLightFeatures.SEGMENT_CONTROL not in device.capabilities.features:
self._logger.warning(
"Segment control is not supported by device %s", device
Expand Down Expand Up @@ -336,6 +340,7 @@ def _send_update_message(self, device: GoveeDevice):
self._send_message(StatusMessage(), device)

def _handle_status_update_response(self, message: StatusResponse, addr):
print("Status update received from %s: %s", addr, message)
ip = addr[0]
device = self.get_device_by_ip(ip)
if device:
Expand Down
11 changes: 11 additions & 0 deletions src/govee_local_api/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def __init__(
self._temperature_color = 0
self._brightness = 0
self._update_callback: Callable[[GoveeDevice], None] | None = None
self._segments_color = (
[(0, 0, 0)] * len(self._capabilities.segments) if self._capabilities else []
)

@property
def controller(self):
Expand Down Expand Up @@ -69,6 +72,13 @@ def brightness(self) -> int:
def temperature_color(self) -> int:
return self._temperature_color

def get_segment_color(self, segment: int) -> Tuple[int, int, int]:
return self._segments_color[segment - 1]

@property
def segments_color(self) -> list[Tuple[int, int, int]]:
return self._segments_color

@property
def update_callback(self) -> Callable[[GoveeDevice], None] | None:
return self._update_callback
Expand All @@ -87,6 +97,7 @@ async def turn_on(self) -> None:
async def set_segment_color(
self, segment: int, color: Tuple[int, int, int]
) -> None:
self._segments_color[segment - 1] = color
await self._controller.set_segment_color(self, segment, color)

async def turn_off(self) -> None:
Expand Down
45 changes: 32 additions & 13 deletions src/govee_local_api/light_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ class GoveeLightFeatures(Enum):
COLOR_KELVIN_TEMPERATURE = auto()
BRIGHTNESS = auto()
SEGMENT_CONTROL = auto()
SCENE_CONTROL = auto()


class GoveeLightColorMode(Enum):
"""Govee Lights color mode."""

MANUAL = auto()
MUSIC = auto()
SCENE = auto()


COMMON_FEATURES = {
Expand All @@ -32,6 +39,10 @@ def __init__(
self.segments = segments
self.scenes = scenes

@property
def segments_count(self) -> int:
return len(self.segments)

def __repr__(self) -> str:
return f"GoveeLightCapabilities(features={self.features!r}, segments={self.segments!r}, scenes={self.scenes!r})"

Expand All @@ -58,11 +69,19 @@ def __str__(self) -> str:
]


COMMON_CAPABILITIES = GoveeLightCapabilities(COMMON_FEATURES)
BRIGHTNESS_ONLY_CAPABILITIES = GoveeLightCapabilities(BRIGHTNESS_ONLY)
SEGMENTS_CAPABILITIES = GoveeLightCapabilities(
{*COMMON_FEATURES, GoveeLightFeatures.SEGMENT_CONTROL}, SEGMENT_CODES, {}
)
COMMON_CAPABILITIES = GoveeLightCapabilities(COMMON_FEATURES, [], {})
BRIGHTNESS_ONLY_CAPABILITIES = GoveeLightCapabilities(BRIGHTNESS_ONLY, [], {})


def _create_with_segment_capabilities(segmentCount: int) -> GoveeLightCapabilities:
if segmentCount <= 0:
return COMMON_CAPABILITIES
return GoveeLightCapabilities(
{*COMMON_FEATURES, GoveeLightFeatures.SEGMENT_CONTROL},
SEGMENT_CODES[:segmentCount],
{},
)


GOVEE_LIGHT_CAPABILITIES: dict[str, GoveeLightCapabilities] = {
# Models with common features
Expand All @@ -86,7 +105,7 @@ def __str__(self) -> str:
"H6110": COMMON_CAPABILITIES,
"H6117": COMMON_CAPABILITIES,
"H6159": COMMON_CAPABILITIES,
"H615A": COMMON_CAPABILITIES,
"H615A": _create_with_segment_capabilities(0),
"H615B": COMMON_CAPABILITIES,
"H615C": COMMON_CAPABILITIES,
"H615D": COMMON_CAPABILITIES,
Expand All @@ -95,15 +114,15 @@ def __str__(self) -> str:
"H6168": COMMON_CAPABILITIES,
"H6172": COMMON_CAPABILITIES,
"H6173": COMMON_CAPABILITIES,
"H618A": COMMON_CAPABILITIES,
"H618A": _create_with_segment_capabilities(15),
"H618C": COMMON_CAPABILITIES,
"H618E": COMMON_CAPABILITIES,
"H618F": COMMON_CAPABILITIES,
"H619A": SEGMENTS_CAPABILITIES,
"H619B": SEGMENTS_CAPABILITIES,
"H619C": SEGMENTS_CAPABILITIES,
"H619D": SEGMENTS_CAPABILITIES,
"H619E": SEGMENTS_CAPABILITIES,
"H619A": _create_with_segment_capabilities(10),
"H619B": _create_with_segment_capabilities(10),
"H619C": _create_with_segment_capabilities(10),
"H619D": _create_with_segment_capabilities(10),
"H619E": _create_with_segment_capabilities(10),
"H619Z": COMMON_CAPABILITIES,
"H61A0": COMMON_CAPABILITIES,
"H61A1": COMMON_CAPABILITIES,
Expand Down
73 changes: 51 additions & 22 deletions src/govee_local_api/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import, annotations

import base64
import json
import base64
from typing import Any, Tuple, TypeVar, Type, Set
Expand Down Expand Up @@ -87,35 +88,63 @@ def __init__(
super().__init__(data)


class PtRealMessage(GoveeMessage):
class HexMessage(GoveeMessage):
command = "ptReal"

def __init__(self, data: list[str]) -> None:
super().__init__({"command": data})
encoded_data: list[str] = [
base64.b64encode(d.encode("utf-8")).decode("utf-8") for d in data
]
super().__init__({"command": encoded_data})


class SegmentColorMessages(PtRealMessage):
def __init__(self, segment: int, color: Tuple[int, int, int]) -> None:
segCode = "0100"
color = "000000"
# data = f"33010000000000000000000000000000000000" # turn off
data = f"33051501{color}0000000000{segCode}0000000000"
data = self.get_checksum(data)
super().__init__([base64.b64encode(bytes.fromhex(data)).decode("utf-8")])

def get_checksum(self, bytes_str) -> str:
# Convert the hex string to bytes all at once
byte_array = bytes.fromhex(bytes_str)

# Initialize the XOR result as 0 (starting point)
xor_result = 0

# XOR each byte directly
for byte in byte_array:
class PtRealMessage(GoveeMessage):
command = "ptReal"

def __init__(self, data: list[bytes]) -> None:
checksumed_data: list[bytes] = [
base64.b64encode(self._with_checksum(d)).decode("utf-8") for d in data
]
super().__init__({"command": checksumed_data})

def _with_checksum(self, data: bytes) -> bytes:
xor_result: int = 0
for byte in data:
xor_result ^= byte
return data + xor_result.to_bytes(1, "big")


# Convert the final result back to a hex string, padded to 2 characters
return f"{bytes_str}{xor_result:02x}"
class SegmentColorMessages(PtRealMessage):
def __init__(self, segment: bytes, color: Tuple[int, int, int]) -> None:
capped_color = [max(0, min(c, 255)) for c in color]
data = (
b"\x33\x05\x15\x01"
+ bytes(capped_color)
+ b"\x00\x00\x00\x00\x00"
+ segment
+ b"\x00\x00\x00\x00\x00"
)
super().__init__([data])


class ColorModeMessage(PtRealMessage):
def __init__(self, mode: bytes) -> None:
data = (
b"\x33\x05\x15\x01"
+ mode
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
super().__init__([data])


class SceneMessages(PtRealMessage):
def __init__(self, scene: bytes) -> None:
data = (
b"\x33\x05\x15\x01"
+ scene
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
super().__init__([data])


class ScanResponse(GoveeMessage):
Expand Down

0 comments on commit 82ac1cb

Please sign in to comment.