-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
linux.py
116 lines (102 loc) · 4.44 KB
/
linux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from __future__ import annotations
import asyncio
import contextlib
import logging
import aiohttp
import async_timeout
from mac_vendor_lookup import AsyncMacLookup
from usb_devices import BluetoothDevice, NotAUSBDeviceError
from ..adapters import BluetoothAdapters
from ..const import UNIX_DEFAULT_BLUETOOTH_ADAPTER
from ..dbus import BlueZDBusObjects
from ..history import AdvertisementHistory
from ..models import AdapterDetails
_LOGGER = logging.getLogger(__name__)
class LinuxAdapters(BluetoothAdapters):
"""Class for getting the bluetooth adapters on a Linux system."""
def __init__(self) -> None:
"""Initialize the adapter."""
self._bluez = BlueZDBusObjects()
self._adapters: dict[str, AdapterDetails] | None = None
self._devices: dict[str, BluetoothDevice] = {}
self._mac_vendor_lookup: AsyncMacLookup | None = None
async def refresh(self) -> None:
"""Refresh the adapters."""
await self._bluez.load()
await asyncio.get_running_loop().run_in_executor(
None, self._create_bluetooth_devices
)
if not self._mac_vendor_lookup:
await self._async_setup()
self._adapters = None
async def _async_setup(self) -> None:
self._mac_vendor_lookup = AsyncMacLookup()
with contextlib.suppress(
asyncio.TimeoutError, aiohttp.ClientError, asyncio.TimeoutError
):
# We don't care if this fails since it only
# improves the data we get.
async with async_timeout.timeout(3):
await self._mac_vendor_lookup.load_vendors()
def _async_get_vendor(self, mac_address: str) -> str | None:
"""Lookup the vendor."""
assert self._mac_vendor_lookup is not None # nosec
oui = self._mac_vendor_lookup.sanitise(mac_address)[:6]
vendor: bytes | None = self._mac_vendor_lookup.prefixes.get(oui.encode())
return vendor.decode()[:254] if vendor is not None else None
def _create_bluetooth_devices(self) -> None:
"""Create the bluetooth devices."""
self._devices = {}
for adapter in self._bluez.adapter_details:
i = int(adapter[3:])
dev = BluetoothDevice(i)
self._devices[adapter] = dev
try:
dev.setup()
except NotAUSBDeviceError:
continue
except FileNotFoundError:
continue
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected error setting up device hci%s", dev)
@property
def history(self) -> dict[str, AdvertisementHistory]:
"""Get the bluez history."""
return self._bluez.history
@property
def adapters(self) -> dict[str, AdapterDetails]:
"""Get the adapter details."""
if self._adapters is None:
adapters: dict[str, AdapterDetails] = {}
adapter_details = self._bluez.adapter_details
for adapter, details in adapter_details.items():
if not (adapter1 := details.get("org.bluez.Adapter1")):
continue
device = self._devices[adapter]
usb_device = device.usb_device
mac_address = adapter1["Address"]
if (
usb_device is None
or usb_device.vendor_id == usb_device.manufacturer
or usb_device.manufacturer is None
or usb_device.manufacturer == "Unknown"
):
manufacturer = self._async_get_vendor(mac_address)
else:
manufacturer = usb_device.manufacturer
adapters[adapter] = AdapterDetails(
address=mac_address,
sw_version=adapter1["Name"], # This is actually the BlueZ version
hw_version=adapter1.get("Modalias"),
passive_scan="org.bluez.AdvertisementMonitorManager1" in details,
manufacturer=manufacturer,
product=usb_device.product if usb_device else None,
vendor_id=usb_device.vendor_id if usb_device else None,
product_id=usb_device.product_id if usb_device else None,
)
self._adapters = adapters
return self._adapters
@property
def default_adapter(self) -> str:
"""Get the default adapter."""
return UNIX_DEFAULT_BLUETOOTH_ADAPTER