diff --git a/examples/passive_scan.py b/examples/passive_scan.py new file mode 100644 index 00000000..87a2750b --- /dev/null +++ b/examples/passive_scan.py @@ -0,0 +1,119 @@ +""" +Scanner using passive scanning mode +-------------- + +Example similar to detection_callback.py, but using passive scanning mode and showing how to iterate devices + +Updated on 2022-11-24 by bojanpotocnik + +""" +import argparse +import asyncio +import logging +from typing import Optional, List, Dict, Any, AsyncIterator, Tuple, Iterable + +import bleak +from bleak import BleakScanner, BLEDevice, AdvertisementData + +logger = logging.getLogger(__name__) + + +def _get_os_specific_scanning_params( + uuids: Optional[List[str]], rssi: Optional[int], macos_use_bdaddr: bool = False +) -> Dict[str, Any]: + def get_bluez_dbus_scanning_params() -> Dict[str, Any]: + from bleak.assigned_numbers import AdvertisementDataType + from bleak.backends.bluezdbus.advertisement_monitor import OrPattern + from bleak.backends.bluezdbus.scanner import ( + BlueZScannerArgs, + BlueZDiscoveryFilters, + ) + + filters = BlueZDiscoveryFilters( + # UUIDs= Added below, because it cannot be None + # RSSI= Added below, because it cannot be None + Transport="le", + DuplicateData=True, + ) + if uuids: + filters["UUIDs"] = uuids + if rssi: + filters["RSSI"] = rssi + + # or_patterns ar required for BlueZ passive scanning + or_patterns = [ + # General Discoverable (peripherals) + OrPattern(0, AdvertisementDataType.FLAGS, b"\x02"), + # BR/EDR Not Supported (BLE peripherals) + OrPattern(0, AdvertisementDataType.FLAGS, b"\x04"), + # General Discoverable, BR/EDR Not Supported (BLE peripherals) + OrPattern(0, AdvertisementDataType.FLAGS, b"\x06"), + # General Discoverable, LE and BR/EDR Capable (Controller), LE and BR/EDR Capable (Host) (computers, phones) + OrPattern(0, AdvertisementDataType.FLAGS, b"\x1A"), + ] + + return {"bluez": BlueZScannerArgs(filters=filters, or_patterns=or_patterns)} + + def get_core_bluetooth_scanning_params() -> Dict[str, Any]: + from bleak.backends.corebluetooth.scanner import CBScannerArgs + + return {"cb": CBScannerArgs(use_bdaddr=macos_use_bdaddr)} + + return { + "BleakScannerBlueZDBus": get_bluez_dbus_scanning_params, + "BleakScannerCoreBluetooth": get_core_bluetooth_scanning_params, + # "BleakScannerP4Android": get_p4android_scanning_params, + # "BleakScannerWinRT": get_winrt_scanning_params, + }.get(bleak.get_platform_scanner_backend_type().__name__, lambda: {})() + + +async def scan( + uuids: Optional[Iterable[str]] = None, + rssi: Optional[int] = None, + macos_use_bdaddr: bool = False, +) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]: + devices = asyncio.Queue() + + def scan_callback(bd: BLEDevice, ad: AdvertisementData): + devices.put_nowait((bd, ad)) + + async with BleakScanner( + detection_callback=scan_callback, + **_get_os_specific_scanning_params(uuids=uuids, rssi=rssi), + scanning_mode="passive" + ): + while True: + try: + yield await devices.get() + except GeneratorExit: + break + + +async def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--services", + metavar="", + nargs="*", + help="UUIDs of one or more services to filter for", + ) + + args = parser.parse_args() + + async for device, adv_data in scan( + uuids=args.services + ): # type: BLEDevice, AdvertisementData + logger.info("%s: %r", device.address, adv_data) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s", + ) + + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass