Skip to content

Commit

Permalink
backends:bgapi: add characteristics read/write/notify
Browse files Browse the repository at this point in the history
Added a few user friendly helpers to auto switch to
write/write_without_response where appropriate, rather than simply
failing.  The BGAPI would simply send the other style as requested, and
the remote device would simply ignore it.

For "start_notify", the "force_indicate" kwarg has been used, to support
both notify and indicate, as in the WinRT backend,
see also hbldh#526 for more information

Signed-off-by: Karl Palsson <karlp@etactica.com>
  • Loading branch information
karlp committed Jan 6, 2023
1 parent 81ca788 commit 03570ca
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
4 changes: 4 additions & 0 deletions bleak/backends/bgapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Backend targetting Silicon Labs devices running "NCP" firmware, using the BGAPI via pyBGAPI
See: https://pypi.org/project/pybgapi/
"""
70 changes: 62 additions & 8 deletions bleak/backends/bgapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import bgapi

from ..characteristic import BleakGATTCharacteristic
from ...exc import BleakError
from ..characteristic import BleakGATTCharacteristic, GattCharacteristicsFlags
from ..client import BaseBleakClient, NotifyCallback
from ..device import BLEDevice
from ..service import BleakGATTServiceCollection
Expand Down Expand Up @@ -63,6 +64,8 @@ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs):
self._ev_gatt_op = asyncio.Event()
self._buffer_characteristics = []
self._buffer_descriptors = []
self._buffer_data = []
self._cbs_notify = {}

async def connect(self, **kwargs) -> bool:
self._lib.open() # this starts a new thread, remember that!
Expand Down Expand Up @@ -136,6 +139,13 @@ def _bgapi_evt_handler(self, evt):
uus = _bgapi_uuid_to_str(evt.uuid)
# Unlike with services, we don't have enough information to directly create the BleakCharacteristic here.
self._loop.call_soon_threadsafe(self._buffer_characteristics.append, PartialCharacteristic(uuid=uus, handle=evt.characteristic, properties=evt.properties))
elif evt == "bt_evt_gatt_characteristic_value":
# This handles reads, long reads, and notifications/indications
if self._cbs_notify.get(evt.characteristic, False):
self._loop.call_soon_threadsafe(self._cbs_notify[evt.characteristic], evt.value)
else:
# because long reads are autohandled, we must keep adding data until the operation completes.
self._loop.call_soon_threadsafe(self._buffer_data.extend, evt.value)
elif evt == "bt_evt_gatt_descriptor":
uus = _bgapi_uuid_to_str(evt.uuid)
# Unlike with services, we don't have enough information to directly create the BleakDescriptor here.
Expand Down Expand Up @@ -200,23 +210,67 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection:

async def read_gatt_char(self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID],
**kwargs) -> bytearray:
raise NotImplementedError
if not isinstance(char_specifier, BleakGATTCharacteristic):
characteristic = self.services.get_characteristic(char_specifier)
else:
characteristic = char_specifier
if not characteristic:
raise BleakError("Characteristic {} was not found!".format(char_specifier))

# this will automatically use long reads if needed, so need to make sure that we bunch up data
self._ev_gatt_op.clear()
self._buffer_data.clear()
self._lib.bt.gatt.read_characteristic_value(self._ch, characteristic.handle)
await self._ev_gatt_op.wait()
return self._buffer_data

async def read_gatt_descriptor(self, handle: int, **kwargs) -> bytearray:
raise NotImplementedError

async def write_gatt_char(self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID],
data: Union[bytes, bytearray, memoryview], response: bool = False) -> None:
raise NotImplementedError
if not isinstance(char_specifier, BleakGATTCharacteristic):
characteristic = self.services.get_characteristic(char_specifier)
else:
characteristic = char_specifier
if not characteristic:
raise BleakError("Characteristic {} was not found!".format(char_specifier))

if (GattCharacteristicsFlags.write.name not in characteristic.properties
and GattCharacteristicsFlags.write_without_response.name not in characteristic.properties):
raise BleakError(f"Characteristic {characteristic} does not support write operations!")
if not response and GattCharacteristicsFlags.write_without_response.name not in characteristic.properties:
# Warning seems harsh, this is just magically "fixing" things, but it's what the bluez backend does.
logger.warning(f"Characteristic {characteristic} does not support write without response, auto-trying as write")
response = True
# bgapi needs "bytes" or a string that it will encode as latin1.
# All of the bleak types can be cast to bytes, and that's easier than modifying pybgapi
odata = bytes(data)
if response:
self._ev_gatt_op.clear()
self._lib.bt.gatt.write_characteristic_value(self._ch, characteristic.handle, odata)
await self._ev_gatt_op.wait()
else:
self._lib.bt.gatt.write_characteristic_value_without_response(self._ch, characteristic.handle, odata)

async def write_gatt_descriptor(self, handle: int, data: Union[bytes, bytearray, memoryview]) -> None:
raise NotImplementedError

async def start_notify(self, characteristic: BleakGATTCharacteristic, callback: NotifyCallback, **kwargs) -> None:
raise NotImplementedError
self._cbs_notify[characteristic.handle] = callback
enable = self._lib.bt.gatt.CLIENT_CONFIG_FLAG_NOTIFICATION
force_indic = kwargs.get("force_indicate", False)
if force_indic:
enable = self._lib.bt.gatt.CLIENT_CONFIG_FLAG_INDICATION
self._lib.bt.gatt.set_characteristic_notification(self._ch, characteristic.handle, enable)

async def stop_notify(self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID]) -> None:
raise NotImplementedError



if not isinstance(char_specifier, BleakGATTCharacteristic):
characteristic = self.services.get_characteristic(char_specifier)
else:
characteristic = char_specifier
if not characteristic:
raise BleakError("Characteristic {} was not found!".format(char_specifier))
self._cbs_notify.pop(characteristic.handle, None) # hard remove callback
cancel = self._lib.bt.gatt.CLIENT_CONFIG_FLAG_DISABLE
self._lib.bt.gatt.set_characteristic_notification(self._ch, characteristic.handle, cancel)

0 comments on commit 03570ca

Please sign in to comment.