-
Notifications
You must be signed in to change notification settings - Fork 304
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
35a0784
commit 57a68f9
Showing
1 changed file
with
119 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
""" | ||
Scanner using passive scanning mode | ||
-------------- | ||
Example similar to detection_callback.py, but using passive scanning mode and showing how to iterate devices | ||
Updated on 2022-11-24 by bojanpotocnik <info@bojanpotocnik.com> | ||
""" | ||
import argparse | ||
import asyncio | ||
import logging | ||
from typing import Optional, List, Dict, Any, AsyncIterator, Tuple, Iterable | ||
|
||
import bleak | ||
from bleak import BleakScanner, BLEDevice, AdvertisementData | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def _get_os_specific_scanning_params( | ||
uuids: Optional[List[str]], rssi: Optional[int], macos_use_bdaddr: bool = False | ||
) -> Dict[str, Any]: | ||
def get_bluez_dbus_scanning_params() -> Dict[str, Any]: | ||
from bleak.assigned_numbers import AdvertisementDataType | ||
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern | ||
from bleak.backends.bluezdbus.scanner import ( | ||
BlueZScannerArgs, | ||
BlueZDiscoveryFilters, | ||
) | ||
|
||
filters = BlueZDiscoveryFilters( | ||
# UUIDs= Added below, because it cannot be None | ||
# RSSI= Added below, because it cannot be None | ||
Transport="le", | ||
DuplicateData=True, | ||
) | ||
if uuids: | ||
filters["UUIDs"] = uuids | ||
if rssi: | ||
filters["RSSI"] = rssi | ||
|
||
# or_patterns ar required for BlueZ passive scanning | ||
or_patterns = [ | ||
# General Discoverable (peripherals) | ||
OrPattern(0, AdvertisementDataType.FLAGS, b"\x02"), | ||
# BR/EDR Not Supported (BLE peripherals) | ||
OrPattern(0, AdvertisementDataType.FLAGS, b"\x04"), | ||
# General Discoverable, BR/EDR Not Supported (BLE peripherals) | ||
OrPattern(0, AdvertisementDataType.FLAGS, b"\x06"), | ||
# General Discoverable, LE and BR/EDR Capable (Controller), LE and BR/EDR Capable (Host) (computers, phones) | ||
OrPattern(0, AdvertisementDataType.FLAGS, b"\x1A"), | ||
] | ||
|
||
return {"bluez": BlueZScannerArgs(filters=filters, or_patterns=or_patterns)} | ||
|
||
def get_core_bluetooth_scanning_params() -> Dict[str, Any]: | ||
from bleak.backends.corebluetooth.scanner import CBScannerArgs | ||
|
||
return {"cb": CBScannerArgs(use_bdaddr=macos_use_bdaddr)} | ||
|
||
return { | ||
"BleakScannerBlueZDBus": get_bluez_dbus_scanning_params, | ||
"BleakScannerCoreBluetooth": get_core_bluetooth_scanning_params, | ||
# "BleakScannerP4Android": get_p4android_scanning_params, | ||
# "BleakScannerWinRT": get_winrt_scanning_params, | ||
}.get(bleak.get_platform_scanner_backend_type().__name__, lambda: {})() | ||
|
||
|
||
async def scan( | ||
uuids: Optional[Iterable[str]] = None, | ||
rssi: Optional[int] = None, | ||
macos_use_bdaddr: bool = False, | ||
) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]: | ||
devices = asyncio.Queue() | ||
|
||
def scan_callback(bd: BLEDevice, ad: AdvertisementData): | ||
devices.put_nowait((bd, ad)) | ||
|
||
async with BleakScanner( | ||
detection_callback=scan_callback, | ||
**_get_os_specific_scanning_params(uuids=uuids, rssi=rssi), | ||
scanning_mode="passive" | ||
): | ||
while True: | ||
try: | ||
yield await devices.get() | ||
except GeneratorExit: | ||
break | ||
|
||
|
||
async def main(): | ||
parser = argparse.ArgumentParser() | ||
|
||
parser.add_argument( | ||
"--services", | ||
metavar="<uuid>", | ||
nargs="*", | ||
help="UUIDs of one or more services to filter for", | ||
) | ||
|
||
args = parser.parse_args() | ||
|
||
async for device, adv_data in scan( | ||
uuids=args.services | ||
): # type: BLEDevice, AdvertisementData | ||
logger.info("%s: %r", device.address, adv_data) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig( | ||
level=logging.INFO, | ||
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s", | ||
) | ||
|
||
try: | ||
asyncio.run(main()) | ||
except KeyboardInterrupt: | ||
pass |