-
Notifications
You must be signed in to change notification settings - Fork 304
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add hack for getting MTU on BlueZ backend.
BlueZ doesn't have a good way to get the negotiated MTU, but we can get it by abusing existing D-Bus methods. Since the workaround is async and could cause unwanted side effects for some devices, it opt-in as seen in the example code.
- Loading branch information
Showing
2 changed files
with
101 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
""" | ||
Example showing how to use BleakClient.mtu_size | ||
""" | ||
|
||
import asyncio | ||
|
||
from bleak import BleakScanner, BleakClient | ||
from bleak.backends.scanner import BLEDevice, AdvertisementData | ||
|
||
# replace with real characteristic UUID | ||
CHAR_UUID = "00000000-0000-0000-0000-000000000000" | ||
|
||
|
||
async def start(): | ||
queue = asyncio.Queue() | ||
|
||
def callback(device: BLEDevice, adv: AdvertisementData) -> None: | ||
# can use advertising data to filter here | ||
queue.put_nowait(device) | ||
|
||
async with BleakScanner(detection_callback=callback): | ||
# get the first matching device | ||
device = await queue.get() | ||
|
||
async with BleakClient(device) as client: | ||
# BlueZ doesn't have a proper way to get the MTU, so we have this hack. | ||
# If this doesn't work for you, you can set the client._mtu_size attribute | ||
# to override the value instead. | ||
if client.__class__.__name__ == "BleakClientBlueZDBus": | ||
await client._acquire_mtu() | ||
|
||
print("MTU:", client.mtu_size) | ||
|
||
# Write without response is limited to MTU - 3 bytes | ||
|
||
data = bytes(1000) # replace with real data | ||
chunk_size = client.mtu_size - 3 | ||
for chunk in ( | ||
data[i : i + chunk_size] for i in range(0, len(data), chunk_size) | ||
): | ||
await client.write_gatt_char(CHAR_UUID, chunk) | ||
|
||
|
||
# It is important to use asyncio.run() to get proper cleanup on KeyboardInterrupt. | ||
# This was introduced in Python 3.7. If you need it in Python 3.6, you can copy | ||
# it from https://github.com/python/cpython/blob/3.7/Lib/asyncio/runners.py | ||
asyncio.run(start()) |