diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..c247e3f0 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog], +and this project adheres to [Semantic Versioning]. + +## [0.1.0] - 2023-04-22 + +- Initial release diff --git a/README.md b/README.md new file mode 100644 index 00000000..593ecd6d --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +# Home Assistant support for Tuya BLE devices + +## Overview + +This integration supports various Mobile-Alerts sensors. The integration acts as proxy server between Mobile-Alerts gateway and cloud. + +_Inspired by [@redphx] code (https://github.com/redphx/poc-tuya-ble-fingerbot) + +## Installation + +Place the `custom_components` folder in your configuration directory (or add its contents to an existing `custom_components` folder). Alternatively install via [HACS](https://hacs.xyz/). + +[![Open your Home Assistant instance and open a repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=PlusPlus-ua&repository=ha_tuya_ble&category=integration) + +## Usage + +After adding to Home Assistan integration should discover all supported Bluetooth devices, or you can add discoverable devices manually. + +The integration works locally, but connection to Tuya BLE device requires device ID and encryption key from Tuya IOT cloud. It could be obtained using the same credentials as in official Tuya integreation. To obtain the credentials please refer to official Tuya integreation [documentation](https://www.home-assistant.io/integrations/tuya/) + +## Supported devices list + +* Fingerbots (category_id 'szjqr') + + Fingerbot (product_id 'yrnk7mnn'), original device fists in category, powered by CR2 battery. + + Fingerbot Plus (product_id 'yiihr7zh'), almost same as original, has sensor button for manual control. + + CubeTouch II (product_id 'xhf790if'), bult-in battery with USB type C charging. + All features available in Home Assistant, except programming (series of actions) - it's not documented and looks useless becouse it could be implemented by Home Assistant scripts or automations. + +* Temperature and humidity sensors (category_id 'wsdcg') + + Soil moisture sensor (product_id 'ojzlzzsw'). + +* CO2 sensors (category_id 'co2bj') + + CO2 Detector (product_id '59s19z5m'). diff --git a/custom_components/tuya_ble/__init__.py b/custom_components/tuya_ble/__init__.py new file mode 100644 index 00000000..8470afca --- /dev/null +++ b/custom_components/tuya_ble/__init__.py @@ -0,0 +1,107 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +import logging + +from bleak_retry_connector import BLEAK_RETRY_EXCEPTIONS as BLEAK_EXCEPTIONS, get_device + +from homeassistant.components import bluetooth +from homeassistant.components.bluetooth.match import ADDRESS, BluetoothCallbackMatcher +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_ADDRESS, EVENT_HOMEASSISTANT_STOP, Platform +from homeassistant.core import Event, HomeAssistant, callback +from homeassistant.exceptions import ConfigEntryNotReady + +from .tuya_ble import TuyaBLEDevice + +from .cloud import HASSTuyaBLEDeviceManager +from .const import DOMAIN +from .devices import TuyaBLECoordinator, TuyaBLEData, get_device_product_info + +PLATFORMS: list[Platform] = [ + Platform.BUTTON, + Platform.NUMBER, + Platform.SENSOR, + Platform.SELECT, + Platform.SWITCH, +] + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up Tuya BLE from a config entry.""" + address: str = entry.data[CONF_ADDRESS] + ble_device = bluetooth.async_ble_device_from_address( + hass, address.upper(), True + ) or await get_device(address) + if not ble_device: + raise ConfigEntryNotReady( + f"Could not find Tuya BLE device with address {address}" + ) + manager = HASSTuyaBLEDeviceManager(hass, entry.options.copy()) + device = TuyaBLEDevice(manager, ble_device) + await device.initialize() + product_info = get_device_product_info(device) + + coordinator = TuyaBLECoordinator(hass, device) + try: + await device.update() + except BLEAK_EXCEPTIONS as ex: + raise ConfigEntryNotReady( + f"Could not communicate with Tuya BLE device with address {address}" + ) from ex + + @callback + def _async_update_ble( + service_info: bluetooth.BluetoothServiceInfoBleak, + change: bluetooth.BluetoothChange, + ) -> None: + """Update from a ble callback.""" + device.set_ble_device_and_advertisement_data( + service_info.device, service_info.advertisement + ) + + entry.async_on_unload( + bluetooth.async_register_callback( + hass, + _async_update_ble, + BluetoothCallbackMatcher({ADDRESS: address}), + bluetooth.BluetoothScanningMode.ACTIVE, + ) + ) + + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = TuyaBLEData( + entry.title, + device, + product_info, + manager, + coordinator, + ) + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + entry.async_on_unload(entry.add_update_listener(_async_update_listener)) + + async def _async_stop(event: Event) -> None: + """Close the connection.""" + await device.stop() + + entry.async_on_unload( + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop) + ) + return True + +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Handle options update.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + if entry.title != data.title: + await hass.config_entries.async_reload(entry.entry_id) + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): + data: TuyaBLEData = hass.data[DOMAIN].pop(entry.entry_id) + await data.device.stop() + + return unload_ok diff --git a/custom_components/tuya_ble/button.py b/custom_components/tuya_ble/button.py new file mode 100644 index 00000000..00bda389 --- /dev/null +++ b/custom_components/tuya_ble/button.py @@ -0,0 +1,163 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from dataclasses import dataclass + +import logging +from typing import Callable + +from homeassistant.components.button import ( + ButtonEntityDescription, + ButtonEntity, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import DOMAIN +from .devices import TuyaBLEData, TuyaBLEEntity, TuyaBLEProductInfo +from .tuya_ble import TuyaBLEDataPointType, TuyaBLEDevice + +_LOGGER = logging.getLogger(__name__) + + +TuyaBLEButtonIsAvailable = Callable[ + ['TuyaBLEButton', TuyaBLEProductInfo], bool +] | None + + +@dataclass +class TuyaBLEButtonMapping: + dp_id: int + description: ButtonEntityDescription + force_add: bool = True + dp_type: TuyaBLEDataPointType | None = None + is_available: TuyaBLEButtonIsAvailable = None + + +def is_fingerbot_in_push_mode( + self: TuyaBLEButton, + product: TuyaBLEProductInfo +) -> bool: + result: bool = False + if product.fingerbot: + datapoint = self._device.datapoints[product.fingerbot.mode] + if datapoint: + result = datapoint.value == 0 + return result + + +@dataclass +class TuyaBLEFingerbotModeMapping(TuyaBLEButtonMapping): + description: ButtonEntityDescription = ButtonEntityDescription( + key="push", + ) + is_available: TuyaBLEButtonIsAvailable = is_fingerbot_in_push_mode + + +@dataclass +class TuyaBLECategoryButtonMapping: + products: dict[str, list[TuyaBLEButtonMapping]] | None = None + mapping: list[TuyaBLEButtonMapping] | None = None + + +mapping: dict[str, TuyaBLECategoryButtonMapping] = { + "szjqr": TuyaBLECategoryButtonMapping( + products={ + "xhf790if": # CubeTouch II + [ + TuyaBLEFingerbotModeMapping(dp_id=1), + ], + "yiihr7zh": # Fingerbot Plus + [ + TuyaBLEFingerbotModeMapping(dp_id=2), + ], + "yrnk7mnn": # Fingerbot + [ + TuyaBLEFingerbotModeMapping(dp_id=2), + ], + }, + ), +} + + +def get_mapping_by_device( + device: TuyaBLEDevice +) -> list[TuyaBLECategoryButtonMapping]: + category = mapping.get(device.category) + if category is not None and category.products is not None: + product_mapping = category.products.get(device.product_id) + if product_mapping is not None: + return product_mapping + if category.mapping is not None: + return category.mapping + else: + return [] + else: + return [] + + +class TuyaBLEButton(TuyaBLEEntity, ButtonEntity): + """Representation of a Tuya BLE Button.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + mapping: TuyaBLEButtonMapping, + ) -> None: + super().__init__( + hass, + coordinator, + device, + product, + mapping.description + ) + self._mapping = mapping + + def press(self) -> None: + """Press the button.""" + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_BOOL, + False, + ) + if datapoint: + self._hass.create_task( + datapoint.set_value(not bool(datapoint.value)) + ) + + @property + def available(self) -> bool: + """Return if entity is available.""" + result = super().available + if result and self._mapping.is_available: + result = self._mapping.is_available(self, self._product) + return result + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Tuya BLE sensors.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + mappings = get_mapping_by_device(data.device) + entities: list[TuyaBLEButton] = [] + for mapping in mappings: + if ( + mapping.force_add or + data.device.datapoints.has_id(mapping.dp_id, mapping.dp_type) + ): + entities.append(TuyaBLEButton( + hass, + data.coordinator, + data.device, + data.product, + mapping, + )) + async_add_entities(entities) diff --git a/custom_components/tuya_ble/cloud.py b/custom_components/tuya_ble/cloud.py new file mode 100644 index 00000000..9a988de5 --- /dev/null +++ b/custom_components/tuya_ble/cloud.py @@ -0,0 +1,285 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +import logging + +from dataclasses import dataclass +import json +from typing import Any + +from homeassistant.const import CONF_ADDRESS +from homeassistant.components.tuya.const import ( + CONF_ACCESS_ID, + CONF_ACCESS_SECRET, + CONF_APP_TYPE, + CONF_AUTH_TYPE, + CONF_COUNTRY_CODE, + CONF_ENDPOINT, + CONF_PASSWORD, + CONF_USERNAME, + DOMAIN as TUYA_DOMAIN, + TUYA_RESPONSE_RESULT, + TUYA_RESPONSE_SUCCESS, +) + +from tuya_iot import ( + TuyaOpenAPI, + AuthType, + TuyaOpenMQ, + TuyaDeviceManager, +) + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import DeviceInfo, EntityDescription +from homeassistant.helpers.update_coordinator import CoordinatorEntity, DataUpdateCoordinator + +from .tuya_ble import AbstaractTuyaBLEDeviceManager, TuyaBLEDevice, TuyaBLEDeviceCredentials + +from .const import ( + CONF_UUID, + CONF_LOCAL_KEY, + CONF_DEVICE_ID, + CONF_CATEGORY, + CONF_PRODUCT_ID, + CONF_DEVICE_NAME, + CONF_PRODUCT_NAME, + DOMAIN, + TUYA_API_FACTORY_INFO_URL, + TUYA_FACTORY_INFO_MAC, +) + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class TuyaCloudCacheItem: + api: TuyaOpenAPI | None + login: dict[str, Any] + credentials: dict[str, dict[str, Any]] + + +CONF_TUYA_LOGIN_KEYS = [ + CONF_ENDPOINT, + CONF_ACCESS_ID, + CONF_ACCESS_SECRET, + CONF_AUTH_TYPE, + CONF_USERNAME, + CONF_PASSWORD, + CONF_COUNTRY_CODE, + CONF_APP_TYPE, +] + +CONF_TUYA_DEVICE_KEYS = [ + CONF_UUID, + CONF_LOCAL_KEY, + CONF_DEVICE_ID, + CONF_CATEGORY, + CONF_PRODUCT_ID, + CONF_DEVICE_NAME, + CONF_PRODUCT_NAME, +] + +_cache: dict[str, TuyaCloudCacheItem] = {} + + +class HASSTuyaBLEDeviceManager(AbstaractTuyaBLEDeviceManager): + """Cloud connected manager of the Tuya BLE devices credentials.""" + + def __init__( + self, + hass: HomeAssistant, + data: dict[str, Any] + ) -> None: + assert hass is not None + self._hass = hass + self._data = data + + @staticmethod + def _is_login_success(response: dict[Any, Any]) -> bool: + return bool(response.get(TUYA_RESPONSE_SUCCESS, False)) + + @staticmethod + def _get_cache_key(data: dict[str, Any]) -> str: + key_dict = {key: data.get(key) for key in CONF_TUYA_LOGIN_KEYS} + return json.dumps(key_dict) + + @staticmethod + def _has_login(data: dict[Any, Any]) -> bool: + for key in CONF_TUYA_LOGIN_KEYS: + if data.get(key) is None: + return False + return True + + @staticmethod + def _has_credentials(data: dict[Any, Any]) -> bool: + for key in CONF_TUYA_DEVICE_KEYS: + if data.get(key) is None: + return False + return True + + async def _login(self, data: dict[str, Any], add_to_cache: bool) -> dict[Any, Any]: + """Login into Tuya cloud using credentials from data dictionary.""" + global _cache + + if len(data) == 0: + return {} + + api = TuyaOpenAPI( + endpoint=data.get(CONF_ENDPOINT, ""), + access_id=data.get(CONF_ACCESS_ID, ""), + access_secret=data.get(CONF_ACCESS_SECRET, ""), + auth_type=data.get(CONF_AUTH_TYPE, ""), + ) + api.set_dev_channel("hass") + + response = await self._hass.async_add_executor_job( + api.connect, + data.get(CONF_USERNAME, ""), + data.get(CONF_PASSWORD, ""), + data.get(CONF_COUNTRY_CODE, ""), + data.get(CONF_APP_TYPE, ""), + ) + + if self._is_login_success(response): + _LOGGER.debug("Successful login for %s", data[CONF_USERNAME]) + if add_to_cache: + auth_type = data[CONF_AUTH_TYPE] + if type(auth_type) is AuthType: + data[CONF_AUTH_TYPE] = auth_type.value + cache_key = self._get_cache_key(data) + cache_item = _cache.get(cache_key) + if cache_item: + cache_item.api = api + cache_item.login = data + else: + _cache[cache_key] = TuyaCloudCacheItem(api, data, {}) + + return response + + def _check_login(self) -> bool: + cache_key = self._get_cache_key(self._data) + return _cache.get(cache_key) != None + + async def login(self, add_to_cache: bool = False) -> dict[Any, Any]: + return await self._login(self._data, add_to_cache) + + async def _fill_cache_item(self, item: TuyaCloudCacheItem) -> None: + openmq = TuyaOpenMQ(item.api) + openmq.start() + + device_manager = TuyaDeviceManager(item.api, openmq) + await self._hass.async_add_executor_job( + device_manager.update_device_list_in_smart_home + ) + + for tuya_device in device_manager.device_map.values(): + response = await self._hass.async_add_executor_job( + item.api.get, + "%s=%s" % ( + TUYA_API_FACTORY_INFO_URL, + tuya_device.id, + ), + ) + factory_info = response[TUYA_RESPONSE_RESULT][0] + if TUYA_FACTORY_INFO_MAC in factory_info: + mac = ':'.join( + factory_info[TUYA_FACTORY_INFO_MAC][i:i + 2] + for i in range(0, 12, 2) + ).upper() + item.credentials[mac] = { + CONF_ADDRESS: mac, + CONF_UUID: tuya_device.uuid, + CONF_LOCAL_KEY: tuya_device.local_key, + CONF_DEVICE_ID: tuya_device.id, + CONF_CATEGORY: tuya_device.category, + CONF_PRODUCT_ID: tuya_device.product_id, + CONF_DEVICE_NAME: tuya_device.name, + CONF_PRODUCT_NAME: tuya_device.product_name, + } + openmq.stop() + + async def build_cache(self) -> None: + global _cache + data = {} + tuya_config_entries = self._hass.config_entries.async_entries( + TUYA_DOMAIN) + for config_entry in tuya_config_entries: + data.clear() + data.update(config_entry.data) + key = self._get_cache_key(data) + item = _cache.get(key) + if item is None or len(item.credentials) == 0: + if self._is_login_success(await self._login(data, True)): + item = _cache.get(key) + if item and len(item.credentials) == 0: + await self._fill_cache_item(item) + + ble_config_entries = self._hass.config_entries.async_entries(DOMAIN) + for config_entry in ble_config_entries: + data.clear() + data.update(config_entry.options) + key = self._get_cache_key(data) + item = _cache.get(key) + if item is None or len(item.credentials) == 0: + if self._is_login_success(await self._login(data, True)): + item = _cache.get(key) + if item and len(item.credentials) == 0: + await self._fill_cache_item(item) + + def get_login_from_cache(self) -> None: + global _cache + for cache_item in _cache.values(): + self._data.update(cache_item.login) + break + + async def get_device_credentials( + self, + address: str, + force_update: bool = False, + save_data: bool = False, + ) -> TuyaBLEDeviceCredentials | None: + """Get credentials of the Tuya BLE device.""" + global _cache + item: TuyaCloudCacheItem | None = None + credentials: dict[str, any] | None = None + result: TuyaBLEDeviceCredentials | None = None + + if not force_update and self._has_credentials(self._data): + credentials = self._data.copy() + else: + cache_key: str | None = None + if self._has_login(self._data): + cache_key = self._get_cache_key(self._data) + else: + for key in _cache.keys(): + if _cache[key].credentials.get(address) is not None: + cache_key = key + break + if cache_key: + item = _cache.get(cache_key) + if item is None or force_update: + if self._is_login_success(await self.login(True)): + item = _cache.get(cache_key) + if item: + await self._fill_cache_item(item) + + if item: + credentials = item.credentials.get(address) + + if credentials: + result = TuyaBLEDeviceCredentials( + credentials.get(CONF_UUID, ""), + credentials.get(CONF_LOCAL_KEY, ""), + credentials.get(CONF_DEVICE_ID, ""), + credentials.get(CONF_CATEGORY, ""), + credentials.get(CONF_PRODUCT_ID, ""), + credentials.get(CONF_DEVICE_NAME, ""), + credentials.get(CONF_PRODUCT_NAME, ""), + ) + _LOGGER.debug("Retrieved: %s", result) + if save_data: + if item: + self._data.update(item.login) + self._data.update(credentials) + + return result diff --git a/custom_components/tuya_ble/config_flow.py b/custom_components/tuya_ble/config_flow.py new file mode 100644 index 00000000..1edf39e7 --- /dev/null +++ b/custom_components/tuya_ble/config_flow.py @@ -0,0 +1,357 @@ +"""Config flow for LEDBLE integration.""" +from __future__ import annotations + +import logging +import pycountry +from typing import Any + +import voluptuous as vol +from tuya_iot import AuthType + +from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow +from homeassistant.components.bluetooth import ( + BluetoothServiceInfoBleak, + async_discovered_service_info, +) +from homeassistant.const import CONF_ADDRESS +from homeassistant.core import callback +from homeassistant.data_entry_flow import FlowHandler, FlowResult + +from homeassistant.components.tuya.const import ( + CONF_ACCESS_ID, + CONF_ACCESS_SECRET, + CONF_APP_TYPE, + CONF_AUTH_TYPE, + CONF_COUNTRY_CODE, + CONF_ENDPOINT, + CONF_PASSWORD, + CONF_USERNAME, + SMARTLIFE_APP, + TUYA_COUNTRIES, + TUYA_RESPONSE_CODE, + TUYA_RESPONSE_MSG, + TUYA_RESPONSE_SUCCESS, + TUYA_SMART_APP, +) + +from .tuya_ble import SERVICE_UUID, TuyaBLEDeviceCredentials + +from .const import ( + DOMAIN, +) +from .devices import TuyaBLEData, get_device_readable_name +from .cloud import HASSTuyaBLEDeviceManager + +_LOGGER = logging.getLogger(__name__) + + +async def _try_login( + manager: HASSTuyaBLEDeviceManager, + user_input: dict[str, Any], + errors: dict[str, str], + placeholders: dict[str, Any], +) -> dict[str, Any] | None: + response: dict[Any, Any] | None + data: dict[str, Any] + + country = [ + country + for country in TUYA_COUNTRIES + if country.name == user_input[CONF_COUNTRY_CODE] + ][0] + + data = { + CONF_ENDPOINT: country.endpoint, + CONF_AUTH_TYPE: AuthType.CUSTOM, + CONF_ACCESS_ID: user_input[CONF_ACCESS_ID], + CONF_ACCESS_SECRET: user_input[CONF_ACCESS_SECRET], + CONF_USERNAME: user_input[CONF_USERNAME], + CONF_PASSWORD: user_input[CONF_PASSWORD], + CONF_COUNTRY_CODE: country.country_code, + } + + for app_type in (TUYA_SMART_APP, SMARTLIFE_APP, ""): + data[CONF_APP_TYPE] = app_type + if app_type == "": + data[CONF_AUTH_TYPE] = AuthType.CUSTOM + else: + data[CONF_AUTH_TYPE] = AuthType.SMART_HOME + + response = await manager._login(data, True) + + if response.get(TUYA_RESPONSE_SUCCESS, False): + return data + + errors["base"] = "login_error" + if response: + placeholders.update({ + TUYA_RESPONSE_CODE: response.get(TUYA_RESPONSE_CODE), + TUYA_RESPONSE_MSG: response.get(TUYA_RESPONSE_MSG), + }) + + return None + + +def _show_login_form( + flow: FlowHandler, + user_input: dict[str, Any], + errors: dict[str, str], + placeholders: dict[str, Any], +) -> FlowResult: + """Shows the Tuya IOT platform login form.""" + if (user_input is not None and + user_input.get(CONF_COUNTRY_CODE) is not None): + for country in TUYA_COUNTRIES: + if country.country_code == user_input[CONF_COUNTRY_CODE]: + user_input[CONF_COUNTRY_CODE] = country.name + break + + def_country = pycountry.countries.get(alpha_2=flow.hass.config.country) + + return flow.async_show_form( + step_id="login", + data_schema=vol.Schema( + { + vol.Required( + CONF_COUNTRY_CODE, + default=user_input.get( + CONF_COUNTRY_CODE, def_country.name), + ): vol.In( + # We don't pass a dict {code:name} because country codes can be duplicate. + [country.name for country in TUYA_COUNTRIES] + ), + vol.Required( + CONF_ACCESS_ID, default=user_input.get( + CONF_ACCESS_ID, "") + ): str, + vol.Required( + CONF_ACCESS_SECRET, + default=user_input.get(CONF_ACCESS_SECRET, ""), + ): str, + vol.Required( + CONF_USERNAME, default=user_input.get( + CONF_USERNAME, "") + ): str, + vol.Required( + CONF_PASSWORD, default=user_input.get( + CONF_PASSWORD, "") + ): str, + } + ), + errors=errors, + description_placeholders=placeholders, + ) + + +class TuyaBLEOptionsFlow(OptionsFlow): + """Handle a Tuya BLE options flow.""" + + def __init__(self, config_entry: ConfigEntry) -> None: + """Initialize options flow.""" + self.config_entry = config_entry + + async def async_step_init( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Manage the options.""" + return await self.async_step_login(user_input) + + async def async_step_login( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle the Tuya IOT platform login step.""" + data: dict[str, Any] | None = None + errors: dict[str, str] = {} + placeholders: dict[str, Any] = {} + credentials: TuyaBLEDeviceCredentials | None = None + address: str | None = self.config_entry.data.get(CONF_ADDRESS) + + if user_input is not None: + entry: TuyaBLEData = self.hass.data[DOMAIN][ + self.config_entry.entry_id + ] + if entry: + data = await _try_login( + entry.manager, + user_input, + errors, + placeholders, + ) + if data: + credentials = await entry.manager.get_device_credentials( + address, True, True + ) + if credentials: + return self.async_create_entry( + title=self.config_entry.title, + data=self.config_entry.data, + options=data, + ) + else: + errors["base"] = "device_not_registered" + + if user_input is None: + user_input = {} + user_input.update(self.config_entry.options) + + return _show_login_form(self, user_input, errors, placeholders) + + +class TuyaBLEConfigFlow(ConfigFlow, domain=DOMAIN): + """Handle a config flow for Tuya BLE.""" + + VERSION = 1 + + def __init__(self) -> None: + """Initialize the config flow.""" + self._discovery_info: BluetoothServiceInfoBleak | None = None + self._discovered_devices: dict[str, BluetoothServiceInfoBleak] = {} + self._data: dict[str, Any] = {} + self._manager: HASSTuyaBLEDeviceManager | None = None + self._get_device_info_error = False + + async def async_step_bluetooth( + self, discovery_info: BluetoothServiceInfoBleak + ) -> FlowResult: + """Handle the bluetooth discovery step.""" + await self.async_set_unique_id(discovery_info.address) + self._abort_if_unique_id_configured() + self._discovery_info = discovery_info + if self._manager is None: + self._manager = HASSTuyaBLEDeviceManager(self.hass, self._data) + await self._manager.build_cache() + self.context["title_placeholders"] = { + "name": await get_device_readable_name( + discovery_info, + self._manager, + ) + } + return await self.async_step_login() + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle the user step.""" + if self._manager is None: + self._manager = HASSTuyaBLEDeviceManager(self.hass, self._data) + await self._manager.build_cache() + return await self.async_step_login() + + async def async_step_login( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle the Tuya IOT platform login step.""" + data: dict[str, Any] | None = None + errors: dict[str, str] = {} + placeholders: dict[str, Any] = {} + + if user_input is not None: + data = await _try_login( + self._manager, + user_input, + errors, + placeholders, + ) + if data: + self._data.update(data) + return await self.async_step_device() + + if user_input is None: + user_input = {} + if self._discovery_info: + await self._manager.get_device_credentials( + self._discovery_info.address, + False, + True, + ) + if self._data is None or len(self._data) == 0: + self._manager.get_login_from_cache() + if self._data is not None and len(self._data) > 0: + user_input.update(self._data) + + return _show_login_form(self, user_input, errors, placeholders) + + async def async_step_device( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle the user step to pick discovered device.""" + errors: dict[str, str] = {} + + if user_input is not None: + address = user_input[CONF_ADDRESS] + discovery_info = self._discovered_devices[address] + local_name = await get_device_readable_name( + discovery_info, + self._manager + ) + await self.async_set_unique_id( + discovery_info.address, raise_on_progress=False + ) + self._abort_if_unique_id_configured() + credentials = await self._manager.get_device_credentials( + discovery_info.address, self._get_device_info_error, True + ) + self._data[CONF_ADDRESS] = discovery_info.address + if credentials is None: + self._get_device_info_error = True + errors["base"] = "device_not_registered" + else: + return self.async_create_entry( + title=local_name, + data={CONF_ADDRESS: discovery_info.address}, + options=self._data, + ) + + if discovery := self._discovery_info: + self._discovered_devices[discovery.address] = discovery + else: + current_addresses = self._async_current_ids() + for discovery in async_discovered_service_info(self.hass): + if ( + discovery.address in current_addresses + or discovery.address in self._discovered_devices + or discovery.service_data is None + or not SERVICE_UUID in discovery.service_data.keys() + ): + continue + self._discovered_devices[discovery.address] = discovery + + if not self._discovered_devices: + return self.async_abort(reason="no_unconfigured_devices") + + def_address: str + if user_input: + def_address = user_input.get(CONF_ADDRESS) + else: + def_address = list(self._discovered_devices)[0] + + return self.async_show_form( + step_id="device", + data_schema=vol.Schema( + { + vol.Required( + CONF_ADDRESS, + default=def_address, + ): vol.In( + { + service_info.address: + await get_device_readable_name( + service_info, + self._manager, + ) + for service_info in + self._discovered_devices.values() + } + ), + }, + ), + errors=errors, + ) + + @staticmethod + @callback + def async_get_options_flow( + config_entry: ConfigEntry, + ) -> TuyaBLEOptionsFlow: + """Get the options flow for this handler.""" + return TuyaBLEOptionsFlow(config_entry) diff --git a/custom_components/tuya_ble/const.py b/custom_components/tuya_ble/const.py new file mode 100644 index 00000000..874b4f60 --- /dev/null +++ b/custom_components/tuya_ble/const.py @@ -0,0 +1,38 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from homeassistant.backports.enum import StrEnum +from typing_extensions import Final + +DOMAIN: Final = "tuya_ble" + +DEVICE_METADATA_UUIDS: Final = "uuids" + +DEVICE_DEF_MANUFACTURER: Final = "Tuya" +SET_DISCONNECTED_DELAY = 10 * 60 + +CONF_UUID: Final = "uuid" +CONF_LOCAL_KEY: Final = "local_key" +CONF_DEVICE_ID: Final = "device_id" +CONF_CATEGORY: Final = "category" +CONF_PRODUCT_ID: Final = "product_id" +CONF_DEVICE_NAME: Final = "device_name" +CONF_PRODUCT_NAME: Final = "product_name" + +TUYA_API_FACTORY_INFO_URL: Final = "/v1.0/iot-03/devices/factory-infos?device_ids" +TUYA_FACTORY_INFO_MAC: Final = "mac" + +BATTERY_STATE_LOW: Final = "low" +BATTERY_STATE_NORMAL: Final = "normal" +BATTERY_STATE_HIGH: Final = "high" + +BATTERY_NOT_CHARGING: Final = "not_charging" +BATTERY_CHARGING: Final = "charging" +BATTERY_CHARGED: Final = "charged" + +CO2_LEVEL_NORMAL: Final = "normal" +CO2_LEVEL_ALARM: Final = "alarm" + +FINGERBOT_MODE_PUSH: Final = "push" +FINGERBOT_MODE_SWITCH: Final = "switch" + diff --git a/custom_components/tuya_ble/devices.py b/custom_components/tuya_ble/devices.py new file mode 100644 index 00000000..a797f3fb --- /dev/null +++ b/custom_components/tuya_ble/devices.py @@ -0,0 +1,292 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations +from dataclasses import dataclass + +import logging + +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback +from homeassistant.helpers import device_registry as dr +from homeassistant.helpers.entity import DeviceInfo, EntityDescription +from homeassistant.helpers.event import async_call_later +from homeassistant.helpers.update_coordinator import ( + CoordinatorEntity, + DataUpdateCoordinator, +) + +from home_assistant_bluetooth import BluetoothServiceInfoBleak +from .tuya_ble import ( + AbstaractTuyaBLEDeviceManager, + TuyaBLEDataPoint, + TuyaBLEDevice, + TuyaBLEDeviceCredentials, +) + +from .cloud import HASSTuyaBLEDeviceManager +from .const import ( + DEVICE_DEF_MANUFACTURER, + DOMAIN, + SET_DISCONNECTED_DELAY, +) + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class TuyaBLEFingerbotInfo: + switch: int + mode: int + up_position: int + down_position: int + hold_time: int + reverse_positions: int + + +@dataclass +class TuyaBLEProductInfo: + name: str + manufacturer: str = DEVICE_DEF_MANUFACTURER + fingerbot: TuyaBLEFingerbotInfo | None = None + + +class TuyaBLEEntity(CoordinatorEntity): + """Tuya BLE base entity.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: TuyaBLECoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + description: EntityDescription, + ) -> None: + super().__init__(coordinator) + self._hass = hass + self._coordinator = coordinator + self._device = device + self._product = product + if description.translation_key is None: + self._attr_translation_key = description.key + self.entity_description = description + self._attr_has_entity_name = True + self._attr_device_info = get_device_info(self._device) + self._attr_unique_id = f"{self._device.device_id}-{description.key}" + + @property + def available(self) -> bool: + """Return if entity is available.""" + return self._coordinator.connected + + @callback + def _handle_coordinator_update(self) -> None: + """Handle updated data from the coordinator.""" + self.async_write_ha_state() + + +class TuyaBLECoordinator(DataUpdateCoordinator[None]): + """Data coordinator for receiving Tuya BLE updates.""" + + def __init__(self, hass: HomeAssistant, device: TuyaBLEDevice) -> None: + """Initialise the coordinator.""" + super().__init__( + hass, + _LOGGER, + name=DOMAIN, + ) + self._device = device + self._disconnected: bool = True + self._unsub_disconnect: CALLBACK_TYPE | None = None + device.register_connected_callback(self._async_handle_connect) + device.register_callback(self._async_handle_update) + device.register_disconnected_callback(self._async_handle_disconnect) + + @property + def connected(self) -> bool: + return not self._disconnected + + @callback + def _async_handle_connect(self) -> None: + if self._unsub_disconnect is not None: + self._unsub_disconnect() + if self._disconnected: + self._disconnected = False + self.async_update_listeners() + + @callback + def _async_handle_update(self, update: list[TuyaBLEDataPoint]) -> None: + """Just trigger the callbacks.""" + self._async_handle_connect() + self.async_set_updated_data(None) + + @callback + def _set_disconnected(self, _: None) -> None: + """Invoke the idle timeout callback, called when the alarm fires.""" + self._disconnected = True + self._unsub_disconnect = None + self.async_update_listeners() + + @callback + def _async_handle_disconnect(self) -> None: + """Trigger the callbacks for disconnected.""" + if self._unsub_disconnect is None: + delay: float = SET_DISCONNECTED_DELAY + self._unsub_disconnect = async_call_later( + self.hass, + delay, + self._set_disconnected + ) + + +@dataclass +class TuyaBLEData: + """Data for the Tuya BLE integration.""" + title: str + device: TuyaBLEDevice + product: TuyaBLEProductInfo + manager: HASSTuyaBLEDeviceManager + coordinator: TuyaBLECoordinator + + +@dataclass +class TuyaBLECategoryInfo: + products: dict[str, TuyaBLEProductInfo] + info: TuyaBLEProductInfo | None = None + + +devices_database: dict[str, TuyaBLECategoryInfo] = { + "co2bj": TuyaBLECategoryInfo( + products={ + "59s19z5m": # device product_id + TuyaBLEProductInfo( + name="CO2 Detector", + ), + }, + ), + "szjqr": TuyaBLECategoryInfo( + products={ + "xhf790if": # device product_id + TuyaBLEProductInfo( + name="CubeTouch II", + fingerbot=TuyaBLEFingerbotInfo( + switch=1, + mode=2, + up_position=5, + down_position=6, + hold_time=3, + reverse_positions=4, + ), + ), + "yiihr7zh": # device product_id + TuyaBLEProductInfo( + name="Fingerbot Plus", + fingerbot=TuyaBLEFingerbotInfo( + switch=2, + mode=8, + up_position=15, + down_position=9, + hold_time=10, + reverse_positions=11, + ), + ), + "yrnk7mnn": # device product_id + TuyaBLEProductInfo( + name="Fingerbot", + fingerbot=TuyaBLEFingerbotInfo( + switch=2, + mode=8, + up_position=15, + down_position=9, + hold_time=10, + reverse_positions=11, + ), + ), + }, + ), + "wsdcg": TuyaBLECategoryInfo( + products={ + "ojzlzzsw": # device product_id + TuyaBLEProductInfo( + name="Soil moisture sensor", + ), + }, + ), +} + + +def get_product_info_by_ids( + category: str, + product_id: str +) -> TuyaBLEProductInfo | None: + category_info = devices_database.get(category) + if category_info is not None: + product_info = category_info.products.get(product_id) + if product_info is not None: + return product_info + return category_info.info + else: + return None + + +def get_device_product_info( + device: TuyaBLEDevice +) -> TuyaBLEProductInfo | None: + return get_product_info_by_ids(device.category, device.product_id) + + +def get_short_address(address: str) -> str: + results = address.replace("-", ":").upper().split(":") + return f"{results[-3]}{results[-2]}{results[-1]}"[-6:] + + +async def get_device_readable_name( + discovery_info: BluetoothServiceInfoBleak, + manager: AbstaractTuyaBLEDeviceManager | None, +) -> str: + credentials: TuyaBLEDeviceCredentials | None = None + product_info: TuyaBLEProductInfo | None = None + if manager: + credentials = await manager.get_device_credentials( + discovery_info.address + ) + if credentials: + product_info = get_product_info_by_ids( + credentials.category, + credentials.product_id, + ) + short_address = get_short_address(discovery_info.address) + if product_info: + return "%s %s" % (product_info.name, short_address) + if credentials: + return "%s %s" % (credentials.device_name, short_address) + return "%s %s" % (discovery_info.device.name, short_address) + + +def get_device_info(device: TuyaBLEDevice) -> DeviceInfo | None: + product_info = None + if device.category and device.product_id: + product_info = get_product_info_by_ids( + device.category, + device.product_id + ) + product_name = product_info.name if product_info else device.name + result = DeviceInfo( + connections={(dr.CONNECTION_BLUETOOTH, device.address)}, + hw_version=device.hardware_version, + identifiers={(DOMAIN, device.address)}, + manufacturer=( + product_info.manufacturer if product_info else + DEVICE_DEF_MANUFACTURER + ), + model=("%s (%s)") % ( + product_name, + device.product_id, + ), + name=("%s %s") % ( + product_name, + get_short_address(device.address), + ), + sw_version=("%s (protocol %s)") % ( + device.device_version, + device.protocol_version, + ), + ) + return result diff --git a/custom_components/tuya_ble/manifest.json b/custom_components/tuya_ble/manifest.json new file mode 100644 index 00000000..f2a1e27d --- /dev/null +++ b/custom_components/tuya_ble/manifest.json @@ -0,0 +1,17 @@ +{ + "domain": "tuya_ble", + "name": "Tuya BLE", + "bluetooth": [ + { + "connectable": true, + "service_data_uuid": "0000a201-0000-1000-8000-00805f9b34fb" + } + ], + "codeowners": ["@PlusPlus-ua"], + "config_flow": true, + "dependencies": ["bluetooth_adapters", "tuya"], + "documentation": "https://www.home-assistant.io/integrations/tuya_ble", + "requirements": ["tuya-iot-py-sdk==0.6.6", "pycountry"], + "iot_class": "local_push", + "version": "0.1.0" +} diff --git a/custom_components/tuya_ble/number.py b/custom_components/tuya_ble/number.py new file mode 100644 index 00000000..cc5b1cfa --- /dev/null +++ b/custom_components/tuya_ble/number.py @@ -0,0 +1,328 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from dataclasses import dataclass + +import logging +from typing import Any, Callable + +from homeassistant.components.number import ( + NumberEntityDescription, + NumberEntity, +) +from homeassistant.components.number.const import NumberMode +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONCENTRATION_PARTS_PER_MILLION, PERCENTAGE, TIME_MINUTES, TIME_SECONDS +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.restore_state import RestoreEntity +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import DOMAIN +from .devices import TuyaBLEData, TuyaBLEEntity, TuyaBLEProductInfo +from .tuya_ble import TuyaBLEDataPointType, TuyaBLEDevice + +_LOGGER = logging.getLogger(__name__) + +TuyaBLENumberGetter = Callable[ + ['TuyaBLENumber', TuyaBLEProductInfo], float | None +] | None + + +TuyaBLENumberIsAvailable = Callable[ + ['TuyaBLENumber', TuyaBLEProductInfo], bool +] | None + + +TuyaBLENumberSetter = Callable[ + ['TuyaBLENumber', TuyaBLEProductInfo, float], bool +] | None + + +@dataclass +class TuyaBLENumberMapping: + dp_id: int + description: NumberEntityDescription + force_add: bool = True + dp_type: TuyaBLEDataPointType | None = None + coefficient: float = 1.0 + is_available: TuyaBLENumberIsAvailable = None + getter: TuyaBLENumberGetter = None + setter: TuyaBLENumberSetter = None + mode: NumberMode = NumberMode.BOX + + +@dataclass +class TuyaBLEDownPositionDescription(NumberEntityDescription): + key: str = "down_position" + icon: str = "mdi:arrow-down-bold" + native_max_value: float = 100 + native_min_value: float = 51 + native_unit_of_measurement: str = PERCENTAGE + native_step: float = 1 + entity_category: EntityCategory = EntityCategory.CONFIG + + +@dataclass +class TuyaBLEUpPositionDescription(NumberEntityDescription): + key: str = "up_position" + icon: str = "mdi:arrow-up-bold" + native_max_value: float = 50 + native_min_value: float = 0 + native_unit_of_measurement: str = PERCENTAGE + native_step: float = 1 + entity_category: EntityCategory = EntityCategory.CONFIG + + +def is_fingerbot_in_push_mode( + self: TuyaBLENumber, + product: TuyaBLEProductInfo +) -> bool: + result: bool = False + if product.fingerbot: + datapoint = self._device.datapoints[product.fingerbot.mode] + if datapoint: + result = datapoint.value == 0 + return result + + +@dataclass +class TuyaBLEHoldTimeDescription(NumberEntityDescription): + key: str = "hold_time" + icon: str = "mdi:timer" + native_max_value: float = 10 + native_min_value: float = 0 + native_unit_of_measurement: str = TIME_SECONDS + native_step: float = 1 + entity_category: EntityCategory = EntityCategory.CONFIG + + +@dataclass +class TuyaBLEHoldTimeMapping(TuyaBLENumberMapping): + description: NumberEntityDescription = TuyaBLEHoldTimeDescription() + is_available: TuyaBLENumberIsAvailable = is_fingerbot_in_push_mode + + +@dataclass +class TuyaBLECategoryNumberMapping: + products: dict[str, list[TuyaBLENumberMapping]] | None = None + mapping: list[TuyaBLENumberMapping] | None = None + + +mapping: dict[str, TuyaBLECategoryNumberMapping] = { + "co2bj": TuyaBLECategoryNumberMapping( + products={ + "59s19z5m": # CO2 Detector + [ + TuyaBLENumberMapping( + dp_id=17, + description=NumberEntityDescription( + key="brightness", + icon="mdi:brightness-percent", + native_max_value=100, + native_min_value=0, + native_unit_of_measurement=PERCENTAGE, + native_step=1, + entity_category=EntityCategory.CONFIG, + ), + mode=NumberMode.SLIDER, + ), + TuyaBLENumberMapping( + dp_id=26, + description=NumberEntityDescription( + key="carbon_dioxide_alarm_level", + icon="mdi:molecule-co2", + native_max_value=5000, + native_min_value=400, + native_unit_of_measurement=CONCENTRATION_PARTS_PER_MILLION, + native_step=100, + entity_category=EntityCategory.CONFIG, + ), + ), + ], + }, + ), + "szjqr": TuyaBLECategoryNumberMapping( + products={ + "xhf790if": # CubeTouch II + [ + TuyaBLEHoldTimeMapping(dp_id=3), + TuyaBLENumberMapping( + dp_id=5, + description=TuyaBLEUpPositionDescription( + native_max_value=100, + ), + ), + TuyaBLENumberMapping( + dp_id=6, + description=TuyaBLEDownPositionDescription( + native_min_value=0, + ), + ), + ], + "yiihr7zh": # Fingerbot Plus + [ + TuyaBLENumberMapping( + dp_id=9, + description=TuyaBLEDownPositionDescription(), + ), + TuyaBLEHoldTimeMapping(dp_id=10), + TuyaBLENumberMapping( + dp_id=15, + description=TuyaBLEUpPositionDescription(), + ), + ], + "yrnk7mnn": # Fingerbot + [ + TuyaBLENumberMapping( + dp_id=9, + description=TuyaBLEDownPositionDescription(), + ), + TuyaBLENumberMapping( + dp_id=10, + description=TuyaBLEHoldTimeDescription( + native_step=0.1, + ), + coefficient=10.0, + is_available=is_fingerbot_in_push_mode, + ), + TuyaBLENumberMapping( + dp_id=15, + description=TuyaBLEUpPositionDescription(), + ), + ], + }, + ), + "wsdcg": TuyaBLECategoryNumberMapping( + products={ + "ojzlzzsw": # Soil moisture sensor + [ + TuyaBLENumberMapping( + dp_id=17, + description=NumberEntityDescription( + key="reporting_period", + icon="mdi:timer", + native_max_value=120, + native_min_value=1, + native_unit_of_measurement=TIME_MINUTES, + native_step=1, + entity_category=EntityCategory.CONFIG, + ), + ), + ], + }, + ), +} + + +def get_mapping_by_device( + device: TuyaBLEDevice +) -> list[TuyaBLECategoryNumberMapping]: + category = mapping.get(device.category) + if category is not None and category.products is not None: + product_mapping = category.products.get(device.product_id) + if product_mapping is not None: + return product_mapping + if category.mapping is not None: + return category.mapping + else: + return [] + else: + return [] + + +class TuyaBLENumber(TuyaBLEEntity, NumberEntity): + """Representation of a Tuya BLE Number.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + mapping: TuyaBLENumberMapping, + ) -> None: + super().__init__( + hass, + coordinator, + device, + product, + mapping.description + ) + self._mapping = mapping + self._attr_mode = mapping.mode + + @property + def native_value(self) -> float | None: + """Return the entity value to represent the entity state.""" + if self._mapping.getter: + return self._mapping.getter(self, self._product) + + datapoint = self._device.datapoints[self._mapping.dp_id] + if datapoint: + return datapoint.value / self._mapping.coefficient + + return self._mapping.description.native_min_value + + def set_native_value(self, value: float) -> None: + """Set new value.""" + if self._mapping.setter: + if self._mapping.setter(self, self._product, value): + return + int_value = int(value * self._mapping.coefficient) + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_VALUE, + int(int_value), + ) + if datapoint: + self._hass.create_task(datapoint.set_value(int_value)) + + @property + def available(self) -> bool: + """Return if entity is available.""" + result = super().available + if result and self._mapping.is_available: + result = self._mapping.is_available(self, self._product) + return result + + +class TuyaBLEFingerbotPosition(TuyaBLENumber, RestoreEntity): + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + mapping: TuyaBLENumberMapping, + ) -> None: + super().__init__(hass, coordinator, device, mapping) + + async def async_internal_added_to_hass(self) -> None: + """Register this entity as a restorable entity.""" + last_state = await self.async_get_last_state() + self._attr_native_value = last_state.state + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Tuya BLE sensors.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + mappings = get_mapping_by_device(data.device) + entities: list[TuyaBLENumber] = [] + for mapping in mappings: + if ( + mapping.force_add or + data.device.datapoints.has_id(mapping.dp_id, mapping.dp_type) + ): + entities.append(TuyaBLENumber( + hass, + data.coordinator, + data.device, + data.product, + mapping, + )) + async_add_entities(entities) diff --git a/custom_components/tuya_ble/select.py b/custom_components/tuya_ble/select.py new file mode 100644 index 00000000..aa6f4ae1 --- /dev/null +++ b/custom_components/tuya_ble/select.py @@ -0,0 +1,198 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from dataclasses import dataclass + +import logging + +from homeassistant.components.select import ( + SelectEntityDescription, + SelectEntity, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import UnitOfTemperature +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import ( + DOMAIN, + FINGERBOT_MODE_PUSH, + FINGERBOT_MODE_SWITCH, +) +from .devices import TuyaBLEData, TuyaBLEEntity, TuyaBLEProductInfo +from .tuya_ble import TuyaBLEDataPointType, TuyaBLEDevice + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class TuyaBLESelectMapping: + dp_id: int + description: SelectEntityDescription + force_add: bool = True + dp_type: TuyaBLEDataPointType | None = None + + +@dataclass +class TemperatureUnitDescription(SelectEntityDescription): + key: str = "temperature_unit" + icon: str = "mdi:thermometer" + entity_category: EntityCategory = EntityCategory.CONFIG + + +@dataclass +class TuyaBLEFingerbotModeMapping(TuyaBLESelectMapping): + description: SelectEntityDescription = SelectEntityDescription( + key="fingerbot_mode", + entity_category=EntityCategory.CONFIG, + options=[FINGERBOT_MODE_PUSH, FINGERBOT_MODE_SWITCH], + ) + + +@dataclass +class TuyaBLECategorySelectMapping: + products: dict[str, list[TuyaBLESelectMapping]] | None = None + mapping: list[TuyaBLESelectMapping] | None = None + + +mapping: dict[str, TuyaBLECategorySelectMapping] = { + "co2bj": TuyaBLECategorySelectMapping( + products={ + "59s19z5m": # CO2 Detector + [ + TuyaBLESelectMapping( + dp_id=101, + description=TemperatureUnitDescription( + options=[ + UnitOfTemperature.CELSIUS, + UnitOfTemperature.FAHRENHEIT, + ], + ) + ), + ], + }, + ), + "wsdcg": TuyaBLECategorySelectMapping( + products={ + "ojzlzzsw": # Soil moisture sensor + [ + TuyaBLESelectMapping( + dp_id=9, + description=TemperatureUnitDescription( + options=[ + UnitOfTemperature.CELSIUS, + UnitOfTemperature.FAHRENHEIT, + ], + entity_registry_enabled_default=False, + ) + ), + ], + }, + ), + "szjqr": TuyaBLECategorySelectMapping( + products={ + "xhf790if": # CubeTouch II + [ + TuyaBLEFingerbotModeMapping(dp_id=2), + ], + "yiihr7zh": # Fingerbot Plus + [ + TuyaBLEFingerbotModeMapping(dp_id=8), + ], + "yrnk7mnn": # Fingerbot + [ + TuyaBLEFingerbotModeMapping(dp_id=8), + ], + }, + ), +} + + +def get_mapping_by_device( + device: TuyaBLEDevice +) -> list[TuyaBLECategorySelectMapping]: + category = mapping.get(device.category) + if category is not None and category.products is not None: + product_mapping = category.products.get(device.product_id) + if product_mapping is not None: + return product_mapping + if category.mapping is not None: + return category.mapping + else: + return [] + else: + return [] + + +class TuyaBLESelect(TuyaBLEEntity, SelectEntity): + """Representation of a Tuya BLE select.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + mapping: TuyaBLESelectMapping, + ) -> None: + super().__init__( + hass, + coordinator, + device, + product, + mapping.description + ) + self._mapping = mapping + self._attr_options = mapping.description.options + + @property + def current_option(self) -> str | None: + """Return the selected entity option to represent the entity state.""" + # Raw value + value: str | None = None + datapoint = self._device.datapoints[self._mapping.dp_id] + if datapoint: + value = datapoint.value + if value >= 0 and value < len(self._attr_options): + return self._attr_options[value] + else: + return value + return None + + def select_option(self, value: str) -> None: + """Change the selected option.""" + if value in self._attr_options: + int_value = self._attr_options.index(value) + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_ENUM, + int_value, + ) + if datapoint: + self._hass.create_task(datapoint.set_value(int_value)) + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Tuya BLE sensors.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + mappings = get_mapping_by_device(data.device) + entities: list[TuyaBLESelect] = [] + for mapping in mappings: + if ( + mapping.force_add or + data.device.datapoints.has_id(mapping.dp_id, mapping.dp_type) + ): + entities.append(TuyaBLESelect( + hass, + data.coordinator, + data.device, + data.product, + mapping, + )) + async_add_entities(entities) diff --git a/custom_components/tuya_ble/sensor.py b/custom_components/tuya_ble/sensor.py new file mode 100644 index 00000000..1194c6cc --- /dev/null +++ b/custom_components/tuya_ble/sensor.py @@ -0,0 +1,339 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from dataclasses import dataclass + +import logging +from typing import Callable + +from homeassistant.components.sensor import ( + SensorDeviceClass, + SensorEntity, + SensorEntityDescription, + SensorStateClass, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + CONCENTRATION_PARTS_PER_MILLION, + PERCENTAGE, + SIGNAL_STRENGTH_DECIBELS_MILLIWATT, + TEMP_CELSIUS, + UnitOfTemperature, +) +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import ( + BATTERY_STATE_HIGH, + BATTERY_STATE_LOW, + BATTERY_STATE_NORMAL, + BATTERY_CHARGED, + BATTERY_CHARGING, + BATTERY_NOT_CHARGING, + CO2_LEVEL_ALARM, + CO2_LEVEL_NORMAL, + DOMAIN, +) +from .devices import TuyaBLEData, TuyaBLEEntity, TuyaBLEProductInfo +from .tuya_ble import TuyaBLEDataPointType, TuyaBLEDevice + +_LOGGER = logging.getLogger(__name__) + +SIGNAL_STRENGTH_DP_ID = -1 + + +TuyaBLESensorIsAvailable = Callable[ + ['TuyaBLESensor', TuyaBLEProductInfo], bool +] | None + + +@dataclass +class TuyaBLESensorMapping: + dp_id: int + description: SensorEntityDescription + force_add: bool = True + dp_type: TuyaBLEDataPointType | None = None + getter: Callable[[TuyaBLESensor], None] | None = None + coefficient: float = 1.0 + icons: list[str] | None = None + is_available: TuyaBLESensorIsAvailable = None + + +@dataclass +class TuyaBLEBatteryMapping(TuyaBLESensorMapping): + description: SensorEntityDescription = SensorEntityDescription( + key="battery", + device_class=SensorDeviceClass.BATTERY, + native_unit_of_measurement=PERCENTAGE, + entity_category=EntityCategory.DIAGNOSTIC, + state_class=SensorStateClass.MEASUREMENT, + ) + + +@dataclass +class TuyaBLETemperatureMapping(TuyaBLESensorMapping): + description: SensorEntityDescription = SensorEntityDescription( + key="temperature", + device_class=SensorDeviceClass.TEMPERATURE, + native_unit_of_measurement=UnitOfTemperature.CELSIUS, + state_class=SensorStateClass.MEASUREMENT, + ) + + +def is_co2_alarm_enabled( + self: TuyaBLESensor, + product: TuyaBLEProductInfo +) -> bool: + result: bool = True + datapoint = self._device.datapoints[13] + if datapoint: + result = bool(datapoint.value) + return result + + +@dataclass +class TuyaBLECategorySensorMapping: + products: dict[str, list[TuyaBLESensorMapping]] | None = None + mapping: list[TuyaBLESensorMapping] | None = None + + +mapping: dict[str, TuyaBLECategorySensorMapping] = { + "co2bj": TuyaBLECategorySensorMapping( + products={ + "59s19z5m": # CO2 Detector + [ + TuyaBLESensorMapping( + dp_id=1, + description=SensorEntityDescription( + key="carbon_dioxide_alarm", + icon="mdi:molecule-co2", + device_class=SensorDeviceClass.ENUM, + options=[ + CO2_LEVEL_ALARM, + CO2_LEVEL_NORMAL, + ], + ), + is_available=is_co2_alarm_enabled, + ), + TuyaBLESensorMapping( + dp_id=2, + description=SensorEntityDescription( + key="carbon_dioxide", + device_class=SensorDeviceClass.CO2, + native_unit_of_measurement=CONCENTRATION_PARTS_PER_MILLION, + state_class=SensorStateClass.MEASUREMENT, + ), + ), + TuyaBLEBatteryMapping(dp_id=15), + TuyaBLETemperatureMapping(dp_id=18), + TuyaBLESensorMapping( + dp_id=19, + description=SensorEntityDescription( + key="humidity", + device_class=SensorDeviceClass.HUMIDITY, + native_unit_of_measurement=PERCENTAGE, + state_class=SensorStateClass.MEASUREMENT, + ), + ), + ] + } + ), + "szjqr": TuyaBLECategorySensorMapping( + products={ + "xhf790if": # CubeTouch II + [ + TuyaBLESensorMapping( + dp_id=7, + description=SensorEntityDescription( + key="battery_charging", + device_class=SensorDeviceClass.ENUM, + entity_category=EntityCategory.DIAGNOSTIC, + options=[ + BATTERY_NOT_CHARGING, + BATTERY_CHARGING, + BATTERY_CHARGED, + ], + ), + icons=[ + "mdi:battery", + "mdi:power-plug-battery", + "mdi:battery-check", + ], + ), + TuyaBLEBatteryMapping(dp_id=8), + ], + "yiihr7zh": # Fingerbot Plus + [ + TuyaBLEBatteryMapping(dp_id=12), + ], + "yrnk7mnn": # Fingerbot + [ + TuyaBLEBatteryMapping(dp_id=12), + ], + }, + ), + "wsdcg": TuyaBLECategorySensorMapping( + products={ + "ojzlzzsw": # Soil moisture sensor + [ + TuyaBLETemperatureMapping( + dp_id=1, + coefficient=10.0, + ), + TuyaBLESensorMapping( + dp_id=2, + description=SensorEntityDescription( + key="moisture", + device_class=SensorDeviceClass.MOISTURE, + native_unit_of_measurement=PERCENTAGE, + state_class=SensorStateClass.MEASUREMENT, + ), + ), + TuyaBLESensorMapping( + dp_id=3, + description=SensorEntityDescription( + key="battery_state", + icon="mdi:battery", + device_class=SensorDeviceClass.ENUM, + entity_category=EntityCategory.DIAGNOSTIC, + options=[ + BATTERY_STATE_LOW, + BATTERY_STATE_NORMAL, + BATTERY_STATE_HIGH, + ], + ), + icons=[ + "mdi:battery-alert", + "mdi:battery-50", + "mdi:battery-check", + ], + ), + TuyaBLEBatteryMapping(dp_id=4), + ], + }, + ), +} + + +def rssi_getter(sensor: TuyaBLESensor) -> None: + sensor._attr_native_value = sensor._device.rssi + + +rssi_mapping = TuyaBLESensorMapping( + dp_id=SIGNAL_STRENGTH_DP_ID, + description=SensorEntityDescription( + key="signal_strength", + device_class=SensorDeviceClass.SIGNAL_STRENGTH, + native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS_MILLIWATT, + state_class=SensorStateClass.MEASUREMENT, + entity_category=EntityCategory.DIAGNOSTIC, + entity_registry_enabled_default=False, + ), + getter=rssi_getter, +) + + +def get_mapping_by_device( + device: TuyaBLEDevice +) -> list[TuyaBLESensorMapping]: + category = mapping.get(device.category) + if category is not None and category.products is not None: + product_mapping = category.products.get(device.product_id) + if product_mapping is not None: + return product_mapping + if category.mapping is not None: + return category.mapping + else: + return [] + else: + return [] + + +class TuyaBLESensor(TuyaBLEEntity, SensorEntity): + """Representation of a Tuya BLE sensor.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + mapping: TuyaBLESensorMapping, + ) -> None: + super().__init__( + hass, + coordinator, + device, + product, + mapping.description + ) + self._mapping = mapping + + @callback + def _handle_coordinator_update(self) -> None: + """Handle updated data from the coordinator.""" + if self._mapping.getter is not None: + self._mapping.getter(self) + else: + datapoint = self._device.datapoints[self._mapping.dp_id] + if datapoint: + if datapoint.type == TuyaBLEDataPointType.DT_ENUM: + if self.entity_description.options is not None: + if (datapoint.value >= 0 and datapoint.value < + len(self.entity_description.options)): + self._attr_native_value = \ + self.entity_description.options[ + datapoint.value] + else: + self._attr_native_value = datapoint.value + if self._mapping.icons is not None: + if (datapoint.value >= 0 and datapoint.value < + len(self._mapping.icons)): + self._attr_icon = \ + self._mapping.icons[datapoint.value] + elif datapoint.type == TuyaBLEDataPointType.DT_VALUE: + self._attr_native_value = \ + datapoint.value / self._mapping.coefficient + else: + self._attr_native_value = datapoint.value + self.async_write_ha_state() + + @property + def available(self) -> bool: + """Return if entity is available.""" + result = super().available + if result and self._mapping.is_available: + result = self._mapping.is_available(self, self._product) + return result + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Tuya BLE sensors.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + mappings = get_mapping_by_device(data.device) + entities: list[TuyaBLESensor] = [TuyaBLESensor( + hass, + data.coordinator, + data.device, + data.product, + rssi_mapping, + )] + for mapping in mappings: + if ( + mapping.force_add or + data.device.datapoints.has_id(mapping.dp_id, mapping.dp_type) + ): + entities.append(TuyaBLESensor( + hass, + data.coordinator, + data.device, + data.product, + mapping, + )) + async_add_entities(entities) diff --git a/custom_components/tuya_ble/strings.json b/custom_components/tuya_ble/strings.json new file mode 100644 index 00000000..5ce5fd67 --- /dev/null +++ b/custom_components/tuya_ble/strings.json @@ -0,0 +1,147 @@ +{ + "config": { + "abort": { + "no_unconfigured_devices": "No unconfigured devices found." + }, + "error": { + "device_not_registered": "Device is not registered in Tuya cloud", + "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", + "login_error": "Login error ({code}): {msg}" + }, + "flow_title": "{name}", + "step": { + "device": { + "data": { + "address": "Bluetooth address" + }, + "description": "Select Tuya BLE device to setup. Device must be registered in the cloud using the mobile application. It's better to unbind the device from Tuya Bluetooth gateway, if any." + }, + "login": { + "data": { + "access_id": "Tuya IoT Access ID", + "access_secret": "Tuya IoT Access Secret", + "country_code": "Country", + "password": "[%key:common::config_flow::data::password%]", + "username": "Account" + }, + "description": "Tuya BLE requires obtaining encryption key from Tuya cloud. Almost all devices only need to access the cloud once during setup.\n\nRefer to documentation of Tuya integration to retrive the cloud credentials https://www.home-assistant.io/integrations/tuya/\n\nEnter your Tuya credentials." + } + } + }, + "entity": { + "button": { + "push": { + "name": "Push" + } + }, + "number": { + "brightness": { + "name": "[%key:component::light::entity_component::_::state_attributes::brightness::name%]" + }, + "carbon_dioxide_alarm_level": { + "name": "Alarm level" + }, + "down_position": { + "name": "Down position" + }, + "hold_time": { + "name": "Hold time" + }, + "reporting_period": { + "name": "Reporting period" + }, + "up_position": { + "name": "Up position" + } + }, + "select": { + "fingerbot_mode": { + "name": "Mode", + "state": { + "push": "Push", + "switch": "[%key:component::switch::entity_component::_::name%]" + } + }, + "temperature_unit": { + "name": "Temperature unit" + } + }, + "sensor": { + "battery": { + "name": "[%key:component::sensor::entity_component::battery::name%]" + }, + "battery_charging": { + "name": "Battery charging", + "state": { + "charged": "Charged", + "charging": "[%key:component::binary_sensor::entity_component::battery_charging::state::on%]", + "not_charging": "[%key:component::binary_sensor::entity_component::battery_charging::state::off%]" + } + }, + "battery_state": { + "name": "Battery state", + "state": { + "high": "High", + "low": "[%key:component::binary_sensor::entity_component::battery::state::on%]", + "normal": "[%key:component::binary_sensor::entity_component::battery::state::off%]" + } + }, + "carbon_dioxide": { + "name": "[%key:component::sensor::entity_component::carbon_dioxide::name%]" + }, + "carbon_dioxide_alarm": { + "name": "Carbon dioxide level", + "state": { + "alarm": "Alarm", + "normal": "Normal" + } + }, + "humidity": { + "name": "[%key:component::sensor::entity_component::humidity::name%]" + }, + "moisture": { + "name": "[%key:component::sensor::entity_component::moisture::name%]" + }, + "signal_strength": { + "name": "[%key:component::sensor::entity_component::signal_strength::name%]" + }, + "temperature": { + "name": "[%key:component::sensor::entity_component::temperature::name%]" + } + }, + "switch": { + "carbon_dioxide_alarm_switch": { + "name": "Alarm enabled" + }, + "carbon_dioxide_severely_exceed_alarm": { + "name": "Severely exceed alarm" + }, + "low_battery_alarm": { + "name": "Low battery alarm" + }, + "manual_control": { + "name": "Manual control" + }, + "reverse_positions": { + "name": "Reverse positions" + }, + "switch": { + "name": "[%key:component::switch::entity_component::_::name%]" + } + } + }, + "options": { + "step": { + "login": { + "data": { + "access_id": "Tuya IoT Access ID", + "access_secret": "Tuya IoT Access Secret", + "country_code": "Country", + "password": "[%key:common::config_flow::data::password%]", + "username": "Account" + }, + "description": "Refer to documentation of Tuya integration to retrive the cloud credentials https://www.home-assistant.io/integrations/tuya/\n\nEnter your Tuya credentials." + } + } + } +} \ No newline at end of file diff --git a/custom_components/tuya_ble/switch.py b/custom_components/tuya_ble/switch.py new file mode 100644 index 00000000..e54b074d --- /dev/null +++ b/custom_components/tuya_ble/switch.py @@ -0,0 +1,289 @@ +"""The Tuya BLE integration.""" +from __future__ import annotations + +from dataclasses import dataclass + +import logging +from typing import Any, Callable + +from homeassistant.components.switch import ( + SwitchEntityDescription, + SwitchEntity, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import DOMAIN +from .devices import TuyaBLEData, TuyaBLEEntity, TuyaBLEProductInfo +from .tuya_ble import TuyaBLEDataPointType, TuyaBLEDevice + +_LOGGER = logging.getLogger(__name__) + + +TuyaBLESwitchIsAvailable = Callable[ + ['TuyaBLESwitch', TuyaBLEProductInfo], bool +] | None + + +@dataclass +class TuyaBLESwitchMapping: + dp_id: int + description: SwitchEntityDescription + force_add: bool = True + dp_type: TuyaBLEDataPointType | None = None + bitmap_mask: bytes | None = None + is_available: TuyaBLESwitchIsAvailable = None + + +def is_fingerbot_in_switch_mode( + self: TuyaBLESwitch, + product: TuyaBLEProductInfo +) -> bool: + result: bool = False + if product.fingerbot: + datapoint = self._device.datapoints[product.fingerbot.mode] + if datapoint: + result = datapoint.value == 1 + return result + + +@dataclass +class TuyaBLEFingerbotSwitchMapping(TuyaBLESwitchMapping): + description: SwitchEntityDescription = SwitchEntityDescription( + key="switch", + ) + is_available: TuyaBLESwitchIsAvailable = is_fingerbot_in_switch_mode + + +@dataclass +class TuyaBLEReversePositionsMapping(TuyaBLESwitchMapping): + description: SwitchEntityDescription = SwitchEntityDescription( + key="reverse_positions", + icon="mdi:arrow-up-down-bold", + entity_category=EntityCategory.CONFIG, + ) + is_available: TuyaBLESwitchIsAvailable = is_fingerbot_in_switch_mode + + +@dataclass +class TuyaBLECategorySwitchMapping: + products: dict[str, list[TuyaBLESwitchMapping]] | None = None + mapping: list[TuyaBLESwitchMapping] | None = None + + +mapping: dict[str, TuyaBLECategorySwitchMapping] = { + "co2bj": TuyaBLECategorySwitchMapping( + products={ + "59s19z5m": # CO2 Detector + [ + TuyaBLESwitchMapping( + dp_id=11, + description=SwitchEntityDescription( + key="carbon_dioxide_severely_exceed_alarm", + icon="mdi:molecule-co2", + entity_category=EntityCategory.CONFIG, + entity_registry_enabled_default=False, + ), + bitmap_mask=b'\x01', + ), + TuyaBLESwitchMapping( + dp_id=11, + description=SwitchEntityDescription( + key="low_battery_alarm", + icon="mdi:battery-alert", + entity_category=EntityCategory.CONFIG, + entity_registry_enabled_default=False, + ), + bitmap_mask=b'\x02', + ), + TuyaBLESwitchMapping( + dp_id=13, + description=SwitchEntityDescription( + key="carbon_dioxide_alarm_switch", + icon="mdi:molecule-co2", + entity_category=EntityCategory.CONFIG, + ) + ), + ], + }, + ), + "szjqr": TuyaBLECategorySwitchMapping( + products={ + "xhf790if": # CubeTouch II + [ + TuyaBLEFingerbotSwitchMapping(dp_id=1), + TuyaBLEReversePositionsMapping(dp_id=4), + ], + "yiihr7zh": # Fingerbot Plus + [ + TuyaBLEFingerbotSwitchMapping(dp_id=2), + TuyaBLEReversePositionsMapping(dp_id=11), + TuyaBLESwitchMapping( + dp_id=17, + description=SwitchEntityDescription( + key="manual_control", + icon="mdi:gesture-tap-box", + entity_category=EntityCategory.CONFIG, + ), + ), + ], + "yrnk7mnn": # Fingerbot + [ + TuyaBLEFingerbotSwitchMapping(dp_id=2), + TuyaBLEReversePositionsMapping(dp_id=11), + ], + }, + ), + "wsdcg": TuyaBLECategorySwitchMapping( + products={ + "ojzlzzsw": # Soil moisture sensor + [ + TuyaBLESwitchMapping( + dp_id=21, + description=SwitchEntityDescription( + key="switch", + icon="mdi:thermometer", + entity_category=EntityCategory.CONFIG, + entity_registry_enabled_default=False, + ), + ), + ], + }, + ), +} + + +def get_mapping_by_device( + device: TuyaBLEDevice +) -> list[TuyaBLECategorySwitchMapping]: + category = mapping.get(device.category) + if category is not None and category.products is not None: + product_mapping = category.products.get(device.product_id) + if product_mapping is not None: + return product_mapping + if category.mapping is not None: + return category.mapping + else: + return [] + else: + return [] + + +class TuyaBLESwitch(TuyaBLEEntity, SwitchEntity): + """Representation of a Tuya BLE Switch.""" + + def __init__( + self, + hass: HomeAssistant, + coordinator: DataUpdateCoordinator, + device: TuyaBLEDevice, + product: TuyaBLEProductInfo, + mapping: TuyaBLESwitchMapping, + ) -> None: + super().__init__( + hass, + coordinator, + device, + product, + mapping.description + ) + self._mapping = mapping + + @property + def is_on(self) -> bool: + """Return true if switch is on.""" + datapoint = self._device.datapoints[self._mapping.dp_id] + if datapoint: + if datapoint.type in [ + TuyaBLEDataPointType.DT_RAW, + TuyaBLEDataPointType.DT_BITMAP + ] and self._mapping.bitmap_mask: + bitmap_value = bytes(datapoint.value) + bitmap_mask = self._mapping.bitmap_mask + for (v, m) in zip(bitmap_value, bitmap_mask, strict=True): + if (v & m) != 0: + return True + else: + return bool(datapoint.value) + return False + + def turn_on(self, **kwargs: Any) -> None: + """Turn the switch on.""" + new_value: bool | bytes + if self._mapping.bitmap_mask: + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_BITMAP, + self._mapping.bitmap_mask, + ) + bitmap_mask = self._mapping.bitmap_mask + bitmap_value = bytes(datapoint.value) + new_value = bytes(v | m for (v, m) in + zip(bitmap_value, bitmap_mask, strict=True)) + else: + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_BOOL, + True, + ) + new_value = True + if datapoint: + self._hass.create_task(datapoint.set_value(new_value)) + + def turn_off(self, **kwargs: Any) -> None: + """Turn the switch off.""" + new_value: bool | bytes + if self._mapping.bitmap_mask: + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_BITMAP, + self._mapping.bitmap_mask, + ) + bitmap_mask = self._mapping.bitmap_mask + bitmap_value = bytes(datapoint.value) + new_value = bytes(v & ~m for (v, m) in + zip(bitmap_value, bitmap_mask, strict=True)) + else: + datapoint = self._device.datapoints.get_or_create( + self._mapping.dp_id, + TuyaBLEDataPointType.DT_BOOL, + False, + ) + new_value = False + if datapoint: + self._hass.create_task(datapoint.set_value(new_value)) + + @property + def available(self) -> bool: + """Return if entity is available.""" + result = super().available + if result and self._mapping.is_available: + result = self._mapping.is_available(self, self._product) + return result + + +async def async_setup_entry( + hass: HomeAssistant, + entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Tuya BLE sensors.""" + data: TuyaBLEData = hass.data[DOMAIN][entry.entry_id] + mappings = get_mapping_by_device(data.device) + entities: list[TuyaBLESwitch] = [] + for mapping in mappings: + if ( + mapping.force_add or + data.device.datapoints.has_id(mapping.dp_id, mapping.dp_type) + ): + entities.append(TuyaBLESwitch( + hass, + data.coordinator, + data.device, + data.product, + mapping, + )) + async_add_entities(entities) diff --git a/custom_components/tuya_ble/translations/en.json b/custom_components/tuya_ble/translations/en.json new file mode 100644 index 00000000..3bd52b01 --- /dev/null +++ b/custom_components/tuya_ble/translations/en.json @@ -0,0 +1,147 @@ +{ + "config": { + "abort": { + "no_unconfigured_devices": "No unconfigured devices found." + }, + "error": { + "device_not_registered": "Device is not registered in Tuya cloud", + "invalid_auth": "Invalid authentication", + "login_error": "Login error ({code}): {msg}" + }, + "flow_title": "{name}", + "step": { + "device": { + "data": { + "address": "Bluetooth address" + }, + "description": "Select Tuya BLE device to setup. Device must be registered in the cloud using the mobile application. It's better to unbind the device from Tuya Bluetooth gateway, if any." + }, + "login": { + "data": { + "access_id": "Tuya IoT Access ID", + "access_secret": "Tuya IoT Access Secret", + "country_code": "Country", + "password": "Password", + "username": "Account" + }, + "description": "Tuya BLE requires obtaining encryption key from Tuya cloud. Almost all devices only need to access the cloud once during setup.\n\nRefer to documentation of Tuya integration to retrive the cloud credentials https://www.home-assistant.io/integrations/tuya/\n\nEnter your Tuya credentials." + } + } + }, + "entity": { + "button": { + "push": { + "name": "Push" + } + }, + "number": { + "brightness": { + "name": "Brightness" + }, + "carbon_dioxide_alarm_level": { + "name": "Alarm level" + }, + "down_position": { + "name": "Down position" + }, + "hold_time": { + "name": "Hold time" + }, + "reporting_period": { + "name": "Reporting period" + }, + "up_position": { + "name": "Up position" + } + }, + "select": { + "fingerbot_mode": { + "name": "Mode", + "state": { + "push": "Push", + "switch": "Switch" + } + }, + "temperature_unit": { + "name": "Temperature unit" + } + }, + "sensor": { + "battery": { + "name": "Battery" + }, + "battery_charging": { + "name": "Battery charging", + "state": { + "charged": "Charged", + "charging": "Charging", + "not_charging": "Not charging" + } + }, + "battery_state": { + "name": "Battery state", + "state": { + "high": "High", + "low": "Low", + "normal": "Normal" + } + }, + "carbon_dioxide": { + "name": "Carbon dioxide" + }, + "carbon_dioxide_alarm": { + "name": "Carbon dioxide level", + "state": { + "alarm": "Alarm", + "normal": "Normal" + } + }, + "humidity": { + "name": "Humidity" + }, + "moisture": { + "name": "Moisture" + }, + "signal_strength": { + "name": "Signal strength" + }, + "temperature": { + "name": "Temperature" + } + }, + "switch": { + "carbon_dioxide_alarm_switch": { + "name": "Alarm enabled" + }, + "carbon_dioxide_severely_exceed_alarm": { + "name": "Severely exceed alarm" + }, + "low_battery_alarm": { + "name": "Low battery alarm" + }, + "manual_control": { + "name": "Manual control" + }, + "reverse_positions": { + "name": "Reverse positions" + }, + "switch": { + "name": "Switch" + } + } + }, + "options": { + "step": { + "login": { + "data": { + "access_id": "Tuya IoT Access ID", + "access_secret": "Tuya IoT Access Secret", + "country_code": "Country", + "password": "Password", + "username": "Account" + }, + "description": "Refer to documentation of Tuya integration to retrive the cloud credentials https://www.home-assistant.io/integrations/tuya/\n\nEnter your Tuya credentials." + } + } + } +} \ No newline at end of file diff --git a/custom_components/tuya_ble/tuya_ble/__init__.py b/custom_components/tuya_ble/tuya_ble/__init__.py new file mode 100644 index 00000000..fb97169d --- /dev/null +++ b/custom_components/tuya_ble/tuya_ble/__init__.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +__version__ = "0.1.0" + + +from .const import ( + SERVICE_UUID, + TuyaBLEDataPointType, +) +from .manager import ( + AbstaractTuyaBLEDeviceManager, + TuyaBLEDeviceCredentials, +) +from .tuya_ble import TuyaBLEDataPoint, TuyaBLEDevice + +__all__ = [ + "AbstaractTuyaBLEDeviceManager", + "TuyaBLEDataPoint", + "TuyaBLEDataPointType", + "TuyaBLEDevice", + "TuyaBLEDeviceCredentials", + "SERVICE_UUID", +] diff --git a/custom_components/tuya_ble/tuya_ble/const.py b/custom_components/tuya_ble/tuya_ble/const.py new file mode 100644 index 00000000..b1514a9c --- /dev/null +++ b/custom_components/tuya_ble/tuya_ble/const.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from enum import Enum + +GATT_MTU = 20 + +DEFAULT_ATTEMPTS = 0xFFFF + +CHARACTERISTIC_NOTIFY = "00002b10-0000-1000-8000-00805f9b34fb" +CHARACTERISTIC_WRITE = "00002b11-0000-1000-8000-00805f9b34fb" + +SERVICE_UUID = "0000a201-0000-1000-8000-00805f9b34fb" + +MANUFACTURER_DATA_ID = 0x07D0 + +RESPONSE_WAIT_TIMEOUT = 60 + + +class TuyaBLECode(Enum): + FUN_SENDER_DEVICE_INFO = 0x0000 + FUN_SENDER_PAIR = 0x0001 + FUN_SENDER_DPS = 0x0002 + FUN_SENDER_DEVICE_STATUS = 0x0003 + + FUN_SENDER_UNBIND = 0x0005 + FUN_SENDER_DEVICE_RESET = 0x0006 + + FUN_SENDER_OTA_START = 0x000C + FUN_SENDER_OTA_FILE = 0x000D + FUN_SENDER_OTA_OFFSET = 0x000E + FUN_SENDER_OTA_UPGRADE = 0x000F + FUN_SENDER_OTA_OVER = 0x0010 + + FUN_SENDER_DPS_V4 = 0x0027 + + FUN_RECEIVE_DP = 0x8001 + FUN_RECEIVE_TIME_DP = 0x8003 + FUN_RECEIVE_SIGN_DP = 0x8004 + FUN_RECEIVE_SIGN_TIME_DP = 0x8005 + + FUN_RECEIVE_DP_V4 = 0x8006 + FUN_RECEIVE_TIME_DP_V4 = 0x8007 + + FUN_RECEIVE_TIME1_REQ = 0x8011 + FUN_RECEIVE_TIME2_REQ = 0x8012 + + +class TuyaBLEDataPointType(Enum): + DT_RAW = 0 + DT_BOOL = 1 + DT_VALUE = 2 + DT_STRING = 3 + DT_ENUM = 4 + DT_BITMAP = 5 diff --git a/custom_components/tuya_ble/tuya_ble/exceptions.py b/custom_components/tuya_ble/tuya_ble/exceptions.py new file mode 100644 index 00000000..07072de7 --- /dev/null +++ b/custom_components/tuya_ble/tuya_ble/exceptions.py @@ -0,0 +1,40 @@ +from __future__ import annotations + + +class TuyaBLEError(Exception): + """Base class for Tuya BLE errors.""" + + +class TuyaBLEEnumValueError(TuyaBLEError): + """Raised when value assigned to DP_ENUM datapoint has unexpected type.""" + + def __init__(self) -> None: + super().__init__("Value of DP_ENUM datapoint must be unsigned integer") + + +class TuyaBLEDataFormatError(TuyaBLEError): + """Raised when data in Tuya BLE structures formatted in wrong way.""" + + def __init__(self) -> None: + super().__init__("Incoming packet is formatted in wrong way") + + +class TuyaBLEDataCRCError(TuyaBLEError): + """Raised when data packet has invalid CRC.""" + + def __init__(self) -> None: + super().__init__("Incoming packet has invalid CRC") + + +class TuyaBLEDataLengthError(TuyaBLEError): + """Raised when data packet has invalid length.""" + + def __init__(self) -> None: + super().__init__("Incoming packet has invalid length") + + +class TuyaBLEDeviceError(TuyaBLEError): + """Raised when Tuya BLE device returned error in response to command.""" + + def __init__(self, code: int) -> None: + super().__init__(("BLE deice returned error code %s") % (code)) diff --git a/custom_components/tuya_ble/tuya_ble/manager.py b/custom_components/tuya_ble/tuya_ble/manager.py new file mode 100644 index 00000000..8dfbb14c --- /dev/null +++ b/custom_components/tuya_ble/tuya_ble/manager.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass +class TuyaBLEDeviceCredentials: + uuid: str + local_key: str + device_id: str + category: str + product_id: str + device_name: str | None + product_name: str | None + + def __str__(self): + print('str called') + return ( + "uuid: xxxxxxxxxxxxxxxx, " + "local_key: xxxxxxxxxxxxxxxx, " + "device_id: xxxxxxxxxxxxxxxx, " + "category: %s, " + "product_id: %s, " + "device_name: %s, " + "product_name: %s" + ) % ( + self.category, + self.product_id, + self.device_name, + self.product_name, + ) + +class AbstaractTuyaBLEDeviceManager(ABC): + """Abstaract manager of the Tuya BLE devices credentials.""" + + @abstractmethod + async def get_device_credentials( + self, + address: str, + force_update: bool = False, + save_data: bool = False, + ) -> TuyaBLEDeviceCredentials | None: + """Get credentials of the Tuya BLE device.""" + pass + + @classmethod + def check_and_create_device_credentials( + self, + uuid: str | None, + local_key: str | None, + device_id: str | None, + category: str | None, + product_id: str | None, + device_name: str | None, + product_name: str | None, + ) -> TuyaBLEDeviceCredentials | None: + """Checks and creates credentials of the Tuya BLE device.""" + if ( + uuid and + local_key and + device_id and + category and + product_id + ): + return TuyaBLEDeviceCredentials( + uuid, + local_key, + device_id, + category, + product_id, + device_name, + product_name, + ) + else: + return None diff --git a/custom_components/tuya_ble/tuya_ble/tuya_ble.py b/custom_components/tuya_ble/tuya_ble/tuya_ble.py new file mode 100644 index 00000000..da32e107 --- /dev/null +++ b/custom_components/tuya_ble/tuya_ble/tuya_ble.py @@ -0,0 +1,1247 @@ +from __future__ import annotations + +import asyncio +import hashlib +import logging +import secrets +import time +from collections.abc import Callable +from struct import pack, unpack + +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData +from bleak.exc import BleakDBusError +from bleak_retry_connector import BLEAK_BACKOFF_TIME +from bleak_retry_connector import BLEAK_RETRY_EXCEPTIONS +from bleak_retry_connector import ( + DEFAULT_ATTEMPTS, + BleakClientWithServiceCache, + BleakError, + BleakNotFoundError, + establish_connection, + retry_bluetooth_connection_error, +) +from Crypto.Cipher import AES + +from .const import ( + CHARACTERISTIC_NOTIFY, + CHARACTERISTIC_WRITE, + GATT_MTU, + MANUFACTURER_DATA_ID, + RESPONSE_WAIT_TIMEOUT, + SERVICE_UUID, + TuyaBLECode, + TuyaBLEDataPointType, +) +from .exceptions import ( + TuyaBLEDataCRCError, + TuyaBLEDataFormatError, + TuyaBLEDataLengthError, + TuyaBLEDeviceError, + TuyaBLEEnumValueError, +) +from .manager import AbstaractTuyaBLEDeviceManager, TuyaBLEDeviceCredentials + +_LOGGER = logging.getLogger(__name__) + + +BLEAK_EXCEPTIONS = ( + *BLEAK_RETRY_EXCEPTIONS, + OSError +) + + +class TuyaBLEDataPoint: + + def __init__( + self, + owner: TuyaBLEDataPoints, + id: int, + timestamp: float, + flags: int, + type: TuyaBLEDataPointType, + value: bytes | bool | int | str, + ) -> None: + self._owner = owner + self._id = id + self._update_from_device(timestamp, flags, type, value) + + def _update_from_device( + self, + timestamp: float, + flags: int, + type: TuyaBLEDataPointType, + value: bytes | bool | int | str, + ) -> None: + self._timestamp = timestamp + self._flags = flags + self._type = type + self._value = value + + def _get_value(self) -> bytes: + match self._type: + case TuyaBLEDataPointType.DT_RAW | TuyaBLEDataPointType.DT_BITMAP: + return self._value + case TuyaBLEDataPointType.DT_BOOL: + return pack(">B", 1 if self._value else 0) + case TuyaBLEDataPointType.DT_VALUE: + return pack(">i", self._value) + case TuyaBLEDataPointType.DT_ENUM: + if self._value > 0xFFFF: + return pack(">I", self._value) + elif self._value > 0xFF: + return pack(">H", self._value) + else: + return pack(">B", self._value) + case TuyaBLEDataPointType.DT_STRING: + return self._value.encode() + + @property + def id(self) -> int: + return self._id + + @property + def timestamp(self) -> float: + return self._timestamp + + @property + def flags(self) -> int: + return self._flags + + @property + def type(self) -> TuyaBLEDataPointType: + return self._type + + @property + def value(self) -> bytes | bool | int | str: + return self._value + + async def set_value(self, value: bytes | bool | int | str) -> None: + match self._type: + case TuyaBLEDataPointType.DT_RAW | TuyaBLEDataPointType.DT_BITMAP: + self._value = bytes(value) + case TuyaBLEDataPointType.DT_BOOL: + self._value = bool(value) + case TuyaBLEDataPointType.DT_VALUE: + self._value = int(value) + case TuyaBLEDataPointType.DT_ENUM: + value = int(value) + if value >= 0: + self._value = value + else: + raise TuyaBLEEnumValueError() + + case TuyaBLEDataPointType.DT_STRING: + self._value = str(value) + + await self._owner._update_from_user(self._id) + + +class TuyaBLEDataPoints: + def __init__(self, owner: TuyaBLEDevice) -> None: + self._owner = owner + self._datapoints: dict[int, TuyaBLEDataPoint] = {} + self._update_started: int = 0 + self._updated_datapoints: list[int] = [] + + def __len__(self) -> int: + return len(self._datapoints) + + def __getitem__(self, key: int) -> TuyaBLEDataPoint | None: + return self._datapoints.get(key) + + def has_id( + self, + id: int, + type: TuyaBLEDataPointType | None = None + ) -> bool: + return ( + (id in self._datapoints) and + ((type is None) or (self._datapoints[id].type == type)) + ) + + def get_or_create( + self, + id: int, + type: TuyaBLEDataPointType, + value: bytes | bool | int | str | None = None + ) -> TuyaBLEDataPoint: + datapoint = self._datapoints.get(id) + if datapoint: + return datapoint + datapoint = TuyaBLEDataPoint(self, id, time.time(), 0, type, value) + self._datapoints[id] = datapoint + return datapoint + + def begin_update(self) -> None: + self._update_started += 1 + + async def end_update(self) -> None: + if self._update_started > 0: + self._update_started -= 1 + if (self._update_started == 0 and + len(self._updated_datapoints) > 0): + await self._owner._send_datapoints(self._updated_datapoints) + self._updated_datapoints = [] + + def _update_from_device( + self, + dp_id: int, + timestamp: float, + flags: int, + type: TuyaBLEDataPointType, + value: bytes | bool | int | str, + ) -> None: + dp = self._datapoints.get(dp_id) + if dp: + dp._update_from_device(timestamp, flags, type, value) + else: + self._datapoints[dp_id] = TuyaBLEDataPoint( + self, dp_id, timestamp, flags, type, value + ) + + async def _update_from_user(self, dp_id: int) -> None: + if self._update_started > 0: + if dp_id in self._updated_datapoints: + self._updated_datapoints.remove(dp_id) + self._updated_datapoints.append(dp_id) + else: + await self._owner._send_datapoints([dp_id]) + + +class TuyaBLEDevice: + def __init__( + self, + device_manager: AbstaractTuyaBLEDeviceManager, + ble_device: BLEDevice, + advertisement_data: AdvertisementData | None = None, + ) -> None: + """Init the TuyaBLE.""" + self._device_manager = device_manager + self._device_info: TuyaBLEDeviceCredentials | None = None + self._ble_device = ble_device + self._advertisement_data = advertisement_data + self._operation_lock = asyncio.Lock() + self._connect_lock = asyncio.Lock() + self._client: BleakClientWithServiceCache | None = None + self._expected_disconnect = False + self._connected_callbacks: list[Callable[[], None]] = [] + self._callbacks: list[Callable[[list[TuyaBLEDataPoint]], None]] = [] + self._disconnected_callbacks: list[Callable[[], None]] = [] + self._current_seq_num = 1 + self._seq_num_lock = asyncio.Lock() + + self._is_bound = False + self._flags = 0 + self._protocol_version = 2 + + self._device_version: str = "" + self._protocol_version_str: str = "" + self._hardware_version: str = "" + + self._device_info: TuyaBLEDeviceCredentials | None = None + + self._auth_key: bytes | None = None + self._local_key: bytes | None = None + self._login_key: bytes | None = None + self._session_key: bytes | None = None + + self._is_paired = False + + self._input_buffer: bytearray | None = None + self._input_expected_packet_num = 0 + self._input_expected_length = 0 + self._input_expected_responses: dict[int, + asyncio.Future[int] | None] = {} + # self._input_future: asyncio.Future[int] | None = None + + self._datapoints = TuyaBLEDataPoints(self) + + def set_ble_device_and_advertisement_data( + self, ble_device: BLEDevice, advertisement_data: AdvertisementData + ) -> None: + """Set the ble device.""" + self._ble_device = ble_device + self._advertisement_data = advertisement_data + + async def initialize(self) -> None: + _LOGGER.debug("%s: Initializing", self.address) + if await self._update_device_info(): + self._decode_advertisement_data() + + await self.update() + + async def _start_session(self) -> None: + _LOGGER.debug("%s: Sending device info request", self.address) + await self._send_packet(TuyaBLECode.FUN_SENDER_DEVICE_INFO, bytes(0)) + + await self.pair() + + # gives an ability to receive and handle timestamp request + # and first datapoints + await asyncio.sleep(0.5) + + await self.update() + + def _build_pairing_request(self) -> bytes: + result = bytearray() + + result += self._device_info.uuid.encode() + result += self._local_key + result += self._device_info.device_id.encode() + for _ in range(44 - len(result)): + result += b"\x00" + + return result + + async def pair(self) -> None: + """ + _LOGGER.debug("%s: Sending pairing request: %s", + self.address, data.hex() + ) + """ + await self._send_packet( + TuyaBLECode.FUN_SENDER_PAIR, + self._build_pairing_request() + ) + + async def update(self) -> None: + _LOGGER.debug("%s: Updating", self.address) + # if self._is_paired: + # _LOGGER.debug("%s: Sending device status request", self.address) + await self._send_packet(TuyaBLECode.FUN_SENDER_DEVICE_STATUS, bytes()) + # else: + # await self.initialize() + + async def _update_device_info(self) -> bool: + if self._device_info is None: + if self._device_manager: + self._device_info = await self._device_manager.get_device_credentials( + self._ble_device.address, False + ) + if self._device_info: + self._local_key = self._device_info.local_key[:6].encode() + self._login_key = hashlib.md5(self._local_key).digest() + + return self._device_info is not None + + def _decode_advertisement_data(self) -> None: + raw_product_id: bytes | None = None + # raw_product_key: bytes | None = None + raw_uuid: bytes | None = None + if self._advertisement_data: + if self._advertisement_data.service_data: + service_data = self._advertisement_data.service_data.get( + SERVICE_UUID) + if service_data and len(service_data) > 1: + match service_data[0]: + case 0: + raw_product_id = service_data[1:] + # case 1: + # raw_product_key = service_data[1:] + + if self._advertisement_data.manufacturer_data: + manufacturer_data = self._advertisement_data.manufacturer_data.get( + MANUFACTURER_DATA_ID + ) + if manufacturer_data and len(manufacturer_data) > 6: + self._is_bound = (manufacturer_data[0] & 0x80) != 0 + self._protocol_version = manufacturer_data[1] + raw_uuid = manufacturer_data[6:] + if raw_product_id: + key = hashlib.md5(raw_product_id).digest() + cipher = AES.new(key, AES.MODE_CBC, key) + raw_uuid = cipher.decrypt(raw_uuid) + self._uuid = raw_uuid.decode("utf-8") + + @property + def address(self) -> str: + """Return the address.""" + return self._ble_device.address + + @property + def name(self) -> str: + """Get the name of the device.""" + if self._device_info: + return self._device_info.device_name + else: + return self._ble_device.name or self._ble_device.address + + @property + def rssi(self) -> int | None: + """Get the rssi of the device.""" + if self._advertisement_data: + return self._advertisement_data.rssi + return None + + @property + def uuid(self) -> str: + if self._device_info is not None: + return self._device_info.uuid + else: + return "" + + @property + def local_key(self) -> str: + if self._device_info is not None: + return self._device_info.local_key + else: + return "" + + @property + def category(self) -> str: + if self._device_info is not None: + return self._device_info.category + else: + return "" + + @property + def device_id(self) -> str: + if self._device_info is not None: + return self._device_info.device_id + else: + return "" + + @property + def product_id(self) -> str: + if self._device_info is not None: + return self._device_info.product_id + else: + return "" + + @property + def product_name(self) -> str: + if self._device_info is not None: + return self._device_info.product_name + else: + return "" + + @property + def device_version(self) -> str: + return self._device_version + + @property + def hardware_version(self) -> str: + return self._hardware_version + + @property + def protocol_version(self) -> str: + return self._protocol_version_str + + @property + def datapoints(self) -> TuyaBLEDataPoints: + """Get datapoints exposed by device.""" + return self._datapoints + + def get_or_create_datapoint( + self, + id: int, + type: TuyaBLEDataPointType, + value: bytes | bool | int | str | None = None + ) -> TuyaBLEDataPoint: + """Get datapoints exposed by device.""" + + def _fire_connected_callbacks(self) -> None: + """Fire the callbacks.""" + for callback in self._connected_callbacks: + callback() + + def register_connected_callback( + self, callback: Callable[[], None] + ) -> Callable[[], None]: + """Register a callback to be called when device disconnected.""" + + def unregister_callback() -> None: + self._connected_callbacks.remove(callback) + + self._connected_callbacks.append(callback) + return unregister_callback + + def _fire_callbacks( + self, + datapoints: list[TuyaBLEDataPoint] + ) -> None: + """Fire the callbacks.""" + for callback in self._callbacks: + callback(datapoints) + + def register_callback( + self, + callback: Callable[[list[TuyaBLEDataPoint]], None], + ) -> Callable[[], None]: + """Register a callback to be called when the state changes.""" + + def unregister_callback() -> None: + self._callbacks.remove(callback) + + self._callbacks.append(callback) + return unregister_callback + + def _fire_disconnected_callbacks(self) -> None: + """Fire the callbacks.""" + for callback in self._disconnected_callbacks: + callback() + + def register_disconnected_callback( + self, callback: Callable[[], None] + ) -> Callable[[], None]: + """Register a callback to be called when device disconnected.""" + + def unregister_callback() -> None: + self._disconnected_callbacks.remove(callback) + + self._disconnected_callbacks.append(callback) + return unregister_callback + + async def start(self): + """Start the TuyaBLE.""" + _LOGGER.debug("%s: Starting...", self.address) + # await self._send_packet() + + async def stop(self) -> None: + """Stop the TuyaBLE.""" + _LOGGER.debug("%s: Stop", self.address) + await self._execute_disconnect() + + def _disconnected(self, client: BleakClientWithServiceCache) -> None: + """Disconnected callback.""" + was_paired = self._is_paired + self._is_paired = False + self._fire_disconnected_callbacks() + if self._expected_disconnect: + _LOGGER.debug( + "%s: Disconnected from device; RSSI: %s", + self.address, + self.rssi, + ) + return + _LOGGER.warning( + "%s: Device unexpectedly disconnected; RSSI: %s", + self.address, + self.rssi, + ) + if was_paired: + _LOGGER.debug( + "%s: Scheduling reconnect; RSSI: %s", + self.address, + self.rssi, + ) + asyncio.create_task(self._reconnect()) + + def _disconnect(self) -> None: + """Disconnect from device.""" + asyncio.create_task(self._execute_timed_disconnect()) + + async def _execute_timed_disconnect(self) -> None: + """Execute timed disconnection.""" + _LOGGER.debug( + "%s: Disconnecting", + self.address, + ) + await self._execute_disconnect() + + async def _execute_disconnect(self) -> None: + """Execute disconnection.""" + async with self._connect_lock: + client = self._client + self._expected_disconnect = True + self._client = None + if client and client.is_connected: + await client.stop_notify(CHARACTERISTIC_NOTIFY) + await client.disconnect() + async with self._seq_num_lock: + self._current_seq_num = 1 + + async def _ensure_connected(self) -> None: + """Ensure connection to device is established.""" + if self._connect_lock.locked(): + _LOGGER.debug( + "%s: Connection already in progress," + " waiting for it to complete; RSSI: %s", + self.address, + self.rssi, + ) + if self._client and self._client.is_connected and self._is_paired: + return + async with self._connect_lock: + # Check again while holding the lock + await asyncio.sleep(0.01) + if self._client and self._client.is_connected and self._is_paired: + return + attempts_count = 100 + while attempts_count > 0: + attempts_count -= 1 + if attempts_count == 0: + _LOGGER.error( + "%s: Connecting, all attempts failed; RSSI: %s", + self.address, + self.rssi + ) + raise BleakNotFoundError() + _LOGGER.debug("%s: Connecting; RSSI: %s", + self.address, self.rssi) + try: + client = await establish_connection( + BleakClientWithServiceCache, + self._ble_device, + self.address, + self._disconnected, + use_services_cache=True, + ble_device_callback=lambda: self._ble_device, + ) + except BleakNotFoundError: + _LOGGER.error( + "%s: device not found, not in range, or poor RSSI: %s", + self.address, + self.rssi, + exc_info=True, + ) + continue + except BLEAK_EXCEPTIONS: + _LOGGER.debug( + "%s: communication failed", + self.address, + exc_info=True + ) + continue + + if client and client.is_connected: + _LOGGER.debug("%s: Connected; RSSI: %s", + self.address, self.rssi) + self._client = client + try: + await self._client.start_notify( + CHARACTERISTIC_NOTIFY, self._notification_handler + ) + except [BLEAK_EXCEPTIONS, BleakNotFoundError]: + self._client = None + _LOGGER.error( + "%s: starting notifications failed", + self.address, + exc_info=True + ) + continue + + if self._client and self._client.is_connected: + _LOGGER.debug( + "%s: Sending device info request", self.address) + try: + if not await self._send_packet_while_connected( + TuyaBLECode.FUN_SENDER_DEVICE_INFO, + bytes(0), + 0, + True, + ): + self._client = None + _LOGGER.error( + "%s: Sending device info request failed", + self.address + ) + continue + except [BLEAK_EXCEPTIONS, BleakNotFoundError]: + self._client = None + _LOGGER.error( + "%s: Sending device info request failed", + self.address, + exc_info=True + ) + continue + + if self._client and self._client.is_connected: + _LOGGER.debug( + "%s: Sending pairing request", self.address) + try: + if not await self._send_packet_while_connected( + TuyaBLECode.FUN_SENDER_PAIR, + self._build_pairing_request(), + 0, + True, + ): + self._client = None + _LOGGER.error( + "%s: Sending pairing request failed", + self.address + ) + continue + except [BLEAK_EXCEPTIONS, BleakNotFoundError]: + self._client = None + _LOGGER.error( + "%s: Sending pairing request failed", + self.address, + exc_info=True + ) + continue + + break + + self._fire_connected_callbacks() + + async def _reconnect(self, use_delay: bool = True) -> None: + """Attempt a reconnect""" + _LOGGER.debug("%s: Reconnect, ensuring connection", self.address) + async with self._seq_num_lock: + self._current_seq_num = 1 + try: + await self._ensure_connected() + _LOGGER.debug("%s: Reconnect, connection ensured", self.address) + except BLEAK_EXCEPTIONS: # BleakNotFoundError: + _LOGGER.debug( + "%s: Reconnect, failed to ensure connection - backing off", + self.address, + exc_info=True, + ) + await asyncio.sleep(BLEAK_BACKOFF_TIME) + _LOGGER.debug("%s: Reconnecting again", self.address) + asyncio.create_task(self._reconnect(False)) + + @staticmethod + def _calc_crc16(data: bytes) -> int: + crc = 0xFFFF + for byte in data: + crc ^= byte & 255 + for _ in range(8): + tmp = crc & 1 + crc >>= 1 + if tmp != 0: + crc ^= 0xA001 + return crc + + @staticmethod + def _pack_int(value: int) -> bytearray: + curr_byte: int + result = bytearray() + while True: + curr_byte = value & 0x7F + value >>= 7 + if value != 0: + curr_byte |= 0x80 + result += pack(">B", curr_byte) + if value == 0: + break + return result + + @staticmethod + def _unpack_int(data: bytes, start_pos: int) -> tuple(int, int): + result: int = 0 + offset: int = 0 + while offset < 5: + pos: int = start_pos + offset + if pos >= len(data): + raise TuyaBLEDataFormatError() + curr_byte: int = data[pos] + result |= (curr_byte & 0x7F) << (offset * 7) + offset += 1 + if (curr_byte & 0x80) == 0: + break + if offset > 4: + raise TuyaBLEDataFormatError() + else: + return (result, start_pos + offset) + + def _build_packets( + self, + seq_num: int, + code: TuyaBLECode, + data: bytes, + response_to: int = 0, + ) -> list[bytes]: + key: bytes + iv = secrets.token_bytes(16) + security_flag: bytes + if code == TuyaBLECode.FUN_SENDER_DEVICE_INFO: + key = self._login_key + security_flag = b"\x04" + else: + key = self._session_key + security_flag = b"\x05" + + raw = bytearray() + raw += pack(">IIHH", seq_num, response_to, code.value, len(data)) + raw += data + crc = self._calc_crc16(raw) + raw += pack(">H", crc) + while len(raw) % 16 != 0: + raw += b"\x00" + + cipher = AES.new(key, AES.MODE_CBC, iv) + encrypted = security_flag + iv + cipher.encrypt(raw) + + command = [] + packet_num = 0 + pos = 0 + length = len(encrypted) + while pos < length: + packet = bytearray() + packet += self._pack_int(packet_num) + + if packet_num == 0: + packet += self._pack_int(length) + packet += pack(">B", self._protocol_version << 4) + + data_part = encrypted[ + pos:pos + GATT_MTU - len(packet) # fmt: skip + ] + packet += data_part + command.append(packet) + + pos += len(data_part) + packet_num += 1 + + return command + + # @retry_bluetooth_connection_error(DEFAULT_ATTEMPTS) + async def _send_packets_locked(self, packets: list[bytes]) -> None: + """Send command to device and read response.""" + try: + await self._int_send_packets_locked(packets) + except BleakDBusError as ex: + # Disconnect so we can reset state and try again + await asyncio.sleep(BLEAK_BACKOFF_TIME) + _LOGGER.debug( + "%s: RSSI: %s; Backing off %ss; " "Disconnecting due to error: %s", + self.address, + self.rssi, + BLEAK_BACKOFF_TIME, + ex, + ) + await self._execute_disconnect() + raise + except BleakError as ex: + # Disconnect so we can reset state and try again + _LOGGER.debug( + "%s: RSSI: %s; Disconnecting due to error: %s", + self.address, + self.rssi, + ex, + ) + await self._execute_disconnect() + raise + + async def _get_seq_num(self) -> int: + async with self._seq_num_lock: + result = self._current_seq_num + self._current_seq_num += 1 + return result + + async def _send_packet( + self, + code: TuyaBLECode, + data: bytes, + wait_for_response: bool = True, + # retry: int | None = None, + ) -> None: + """Send packet to device and optional read response.""" + await self._ensure_connected() + await self._send_packet_while_connected(code, data, 0, wait_for_response) + + async def _send_response( + self, + code: TuyaBLECode, + data: bytes, + response_to: int, + ) -> None: + """Send response to received packet.""" + await self._ensure_connected() + await self._send_packet_while_connected(code, data, response_to, False) + + async def _send_packet_while_connected( + self, + code: TuyaBLECode, + data: bytes, + response_to: int, + wait_for_response: bool, + # retry: int | None = None + ) -> bool: + """Send packet to device and optional read response.""" + result = True + if self._operation_lock.locked(): + _LOGGER.debug( + "%s: Operation already in progress, " + "waiting for it to complete; RSSI: %s", + self.address, + self.rssi, + ) + async with self._operation_lock: + try: + future: asyncio.Future | None = None + seq_num = await self._get_seq_num() + if wait_for_response: + future = asyncio.Future() + self._input_expected_responses[seq_num] = future + + _LOGGER.debug( + "%s: Sending packet: #%s %s", + self.address, + seq_num, + code.name, + ) + packets: list[bytes] = self._build_packets( + seq_num, code, data, response_to + ) + await self._send_packets_locked(packets) + if future: + try: + await asyncio.wait_for(future, RESPONSE_WAIT_TIMEOUT) + except asyncio.TimeoutError: + _LOGGER.error( + "%s: timeout receiving response, RSSI: %s", + self.address, + self.rssi, + ) + result = False + self._input_expected_responses.pop(seq_num, None) + except BleakNotFoundError: + _LOGGER.error( + "%s: device not found, no longer in range, " "or poor RSSI: %s", + self.address, + self.rssi, + exc_info=True, + ) + raise + except BLEAK_EXCEPTIONS: + _LOGGER.error( + "%s: communication failed", + self.address, + exc_info=True, + ) + raise + + return result + + async def _int_send_packets_locked(self, packets: list[bytes]) -> None: + """Execute command and read response.""" + if self._client: + for packet in packets: + # _LOGGER.debug("%s: Sending packet: %s", self.address, packet.hex()) + await self._client.write_gatt_char( + CHARACTERISTIC_WRITE, + packet, + False, + ) + else: + raise BleakError() + + def _get_key(self, security_flag: int) -> bytes: + if security_flag == 1: + return self._auth_key + if security_flag == 4: + return self._login_key + elif security_flag == 5: + return self._session_key + else: + pass + + def _parse_timestamp(self, data: bytes, start_pos: int) -> tuple(float, int): + timestamp: float + pos = start_pos + if pos >= len(data): + raise TuyaBLEDataLengthError() + time_type = data[pos] + pos += 1 + end_pos = pos + match time_type: + case 0: + end_pos += 13 + if end_pos > len(data): + raise TuyaBLEDataLengthError() + timestamp = int(data[pos:end_pos].decode()) / 1000 + pass + case 1: + end_pos += 4 + if end_pos > len(data): + raise TuyaBLEDataLengthError() + timestamp = int.from_bytes(data[pos:end_pos], "big") * 1.0 + pass + case _: + raise TuyaBLEDataFormatError() + + _LOGGER.debug( + "%s: Received timestamp: %s", + self.address, + time.ctime(timestamp), + ) + return (timestamp, end_pos) + + def _parse_datapoints_v3( + self, timestamp: float, flags: int, data: bytes, start_pos: int + ) -> int: + datapoints: list[TuyaBLEDataPoint] = [] + + pos = start_pos + while len(data) - pos >= 4: + id: int = data[pos] + pos += 1 + _type: int = data[pos] + if _type > TuyaBLEDataPointType.DT_BITMAP.value: + raise TuyaBLEDataFormatError() + type: TuyaBLEDataPointType = TuyaBLEDataPointType(_type) + pos += 1 + data_len: int = data[pos] + pos += 1 + next_pos = pos + data_len + if next_pos > len(data): + raise TuyaBLEDataLengthError() + raw_value = data[pos:next_pos] + match type: + case (TuyaBLEDataPointType.DT_RAW | TuyaBLEDataPointType.DT_BITMAP): + value = raw_value + case TuyaBLEDataPointType.DT_BOOL: + value = int.from_bytes(raw_value, "big") != 0 + case (TuyaBLEDataPointType.DT_VALUE | TuyaBLEDataPointType.DT_ENUM): + value = int.from_bytes(raw_value, "big", signed=True) + case TuyaBLEDataPointType.DT_STRING: + value = raw_value.decode() + + _LOGGER.debug( + "%s: Received datapoint update, id: %s, type: %s: value: %s", + self.address, + id, + type.name, + value, + ) + self._datapoints._update_from_device( + id, timestamp, flags, type, value) + datapoints.append(self._datapoints[id]) + pos = next_pos + + self._fire_callbacks(datapoints) + + def _handle_command_or_response( + self, seq_num: int, response_to: int, code: TuyaBLECode, data: bytes + ) -> None: + result: int = 0 + + match code: + case TuyaBLECode.FUN_SENDER_DEVICE_INFO: + if len(data) < 46: + raise TuyaBLEDataLengthError() + + self._device_version = ("%s.%s") % (data[0], data[1]) + self._protocol_version_str = ("%s.%s") % (data[2], data[3]) + self._hardware_version = ("%s.%s") % (data[12], data[13]) + + self._protocol_version = data[2] + self._flags = data[4] + self._is_bound = data[5] != 0 + + srand = data[6:12] + self._session_key = hashlib.md5( + self._local_key + srand).digest() + self._auth_key = data[14:46] + + case TuyaBLECode.FUN_SENDER_PAIR: + if len(data) != 1: + raise TuyaBLEDataLengthError() + result = data[0] + if result == 2: + _LOGGER.debug( + "%s: Device is already paired", + self.address, + ) + result = 0 + self._is_paired = result == 0 + + case TuyaBLECode.FUN_SENDER_DEVICE_STATUS: + if len(data) != 1: + raise TuyaBLEDataLengthError() + result = data[0] + + case TuyaBLECode.FUN_RECEIVE_TIME1_REQ: + if len(data) != 0: + raise TuyaBLEDataLengthError() + + timestamp = int(time.time_ns() / 1000000) + timezone = -int(time.timezone / 36) + data = str(timestamp).encode() + pack(">h", timezone) + asyncio.create_task(self._send_response(code, data, seq_num)) + + case TuyaBLECode.FUN_RECEIVE_TIME2_REQ: + if len(data) != 0: + raise TuyaBLEDataLengthError() + + time_str: time.struct_time = time.localtime() + timezone = -int(time.timezone / 36) + data = pack( + ">BBBBBBBh", + time_str.tm_year % 100, + time_str.tm_mon, + time_str.tm_mday, + time_str.tm_hour, + time_str.tm_min, + time_str.tm_sec, + time_str.tm_wday, + timezone, + ) + asyncio.create_task(self._send_response(code, data, seq_num)) + + case TuyaBLECode.FUN_RECEIVE_DP: + self._parse_datapoints_v3(time.time(), 0, data, 0) + asyncio.create_task( + self._send_response(code, bytes(0), seq_num)) + + case TuyaBLECode.FUN_RECEIVE_SIGN_DP: + dp_seq_num = int.from_bytes(data[:2], "big") + flags = data[2] + self._parse_datapoints_v3(time.time(), flags, data, 2) + data = pack(">HBB", dp_seq_num, flags, 0) + asyncio.create_task(self._send_response(code, data, seq_num)) + + case TuyaBLECode.FUN_RECEIVE_TIME_DP: + timestamp: float + pos: int + timestamp, pos = self._parse_timestamp(data, 0) + self._parse_datapoints_v3(timestamp, 0, data, pos) + asyncio.create_task( + self._send_response(code, bytes(0), seq_num)) + + case TuyaBLECode.FUN_RECEIVE_SIGN_TIME_DP: + timestamp: float + pos: int + dp_seq_num = int.from_bytes(data[:2], "big") + flags = data[2] + timestamp, pos = self._parse_timestamp(data, 3) + self._parse_datapoints_v3(time.time(), flags, data, pos) + data = pack(">HBB", dp_seq_num, flags, 0) + asyncio.create_task(self._send_response(code, data, seq_num)) + + if response_to != 0: + future = self._input_expected_responses.pop(response_to, None) + if future: + _LOGGER.debug( + "%s: Received expected response to #%s, result: %s", + self.address, + response_to, + result, + ) + if result == 0: + future.set_result(result) + else: + future.set_exception(TuyaBLEDeviceError(result)) + + def _clean_input(self) -> None: + self._input_buffer = None + self._input_expected_packet_num = 0 + self._input_expected_length = 0 + + def _parse_input(self) -> None: + security_flag = self._input_buffer[0] + key = self._get_key(security_flag) + iv = self._input_buffer[1:17] + encrypted = self._input_buffer[17:] + + self._clean_input() + + cipher = AES.new(key, AES.MODE_CBC, iv) + raw = cipher.decrypt(encrypted) + + seq_num: int + response_to: int + _code: int + length: int + seq_num, response_to, _code, length = unpack(">IIHH", raw[:12]) + + code: TuyaBLECode = TuyaBLECode(_code) + data_end_pos = length + 12 + raw_length = len(raw) + if raw_length < data_end_pos: + raise TuyaBLEDataLengthError() + if raw_length > data_end_pos: + calc_crc = self._calc_crc16(raw[:data_end_pos]) + (data_crc,) = unpack( + ">H", + raw[data_end_pos:data_end_pos + 2] # fmt: skip + ) + if calc_crc != data_crc: + raise TuyaBLEDataCRCError() + data = raw[12:data_end_pos] + + if response_to != 0: + _LOGGER.debug( + "%s: Received: #%s %s, response to #%s", + self.address, + seq_num, + code.name, + response_to, + ) + else: + _LOGGER.debug( + "%s: Received: #%s %s", + self.address, + seq_num, + code.name, + ) + + self._handle_command_or_response(seq_num, response_to, code, data) + + def _notification_handler(self, _sender: int, data: bytearray) -> None: + """Handle notification responses.""" + # _LOGGER.debug("%s: Packet received: %s", self.address, data.hex()) + + pos: int = 0 + packet_num: int + + packet_num, pos = self._unpack_int(data, pos) + + if packet_num < self._input_expected_packet_num: + _LOGGER.error( + "%s: Unexpcted packet (number %s) in notifications, " "expected %s", + self.address, + packet_num, + self._input_expected_packet_num, + ) + self._clean_input() + + if packet_num == self._input_expected_packet_num: + if packet_num == 0: + self._input_buffer = bytearray() + self._input_expected_length, pos = self._unpack_int(data, pos) + pos += 1 + self._input_buffer += data[pos:] + self._input_expected_packet_num += 1 + else: + _LOGGER.error( + "%s: Missing packet (number %s) in notifications, received %s", + self.address, + self._input_expected_packet_num, + packet_num, + ) + self._clean_input() + return + + if len(self._input_buffer) > self._input_expected_length: + _LOGGER.error( + "%s: Unexpcted length of data in notifications, " + "received %s expected %s", + self.address, + len(self._input_buffer), + self._input_expected_length, + ) + self._clean_input() + return + elif len(self._input_buffer) == self._input_expected_length: + self._parse_input() + + async def _send_datapoints_v3(self, datapoint_ids: list[int]) -> None: + """Send new values of datapoints to the device.""" + data = bytearray() + for dp_id in datapoint_ids: + dp = self._datapoints[dp_id] + value = dp._get_value() + data += pack(">BBB", dp.id, dp.type.value, len(value)) + data += value + _LOGGER.debug( + "%s: Sending datapoint update, id: %s, type: %s: value: %s", + self.address, + dp.id, + dp.type.name, + dp.value, + ) + + await self._send_packet(TuyaBLECode.FUN_SENDER_DPS, data) + + async def _send_datapoints(self, datapoint_ids: list[int]) -> None: + """Send new values of datapoints to the device.""" + if self._protocol_version == 3: + await self._send_datapoints_v3(datapoint_ids) + else: + raise TuyaBLEDeviceError(0) diff --git a/hacs.json b/hacs.json new file mode 100644 index 00000000..6b9d5296 --- /dev/null +++ b/hacs.json @@ -0,0 +1,5 @@ +{ + "name": "Tuya BLE", + "content_in_root": false, + "render_readme": true +}