forked from faglo/AirStatus
-
Notifications
You must be signed in to change notification settings - Fork 23
/
main.py
149 lines (120 loc) · 4.54 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from bleak import discover
from asyncio import new_event_loop, set_event_loop, get_event_loop
from time import sleep, time_ns
from binascii import hexlify
from json import dumps
from sys import argv
from datetime import datetime
# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
MIN_RSSI = -60
AIRPODS_MANUFACTURER = 76
AIRPODS_DATA_LENGTH = 54
RECENT_BEACONS_MAX_T_NS = 10000000000 # 10 Seconds
recent_beacons = []
def get_best_result(device):
recent_beacons.append({
"time": time_ns(),
"device": device
})
strongest_beacon = None
i = 0
while i < len(recent_beacons):
if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS):
recent_beacons.pop(i)
continue
if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi):
strongest_beacon = recent_beacons[i]["device"]
i += 1
if (strongest_beacon != None and strongest_beacon.address == device.address):
strongest_beacon = device
return strongest_beacon
# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
for d in devices:
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])))
if data_length == AIRPODS_DATA_LENGTH:
return data_hex
return False
# Same as get_device() but it's standalone method instead of async
def get_data_hex():
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
return a
# Getting data from hex string and converting it to dict(json)
# Getting data from hex string and converting it to dict(json)
def get_data():
raw = get_data_hex()
# Return blank data if airpods not found
if not raw:
return dict(status=0, model="AirPods not found")
flip: bool = is_flipped(raw)
# On 7th position we can get AirPods model, gen1, gen2, Pro or Max
if chr(raw[7]) == 'e':
model = "AirPodsPro"
elif chr(raw[7]) == '3':
model = "AirPods3"
elif chr(raw[7]) == 'f':
model = "AirPods2"
elif chr(raw[7]) == '2':
model = "AirPods1"
elif chr(raw[7]) == 'a':
model = "AirPodsMax"
else:
model = "unknown"
# Checking left AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[12 if flip else 13]), 16)
left_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking right AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[13 if flip else 12]), 16)
right_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking AirPods case for availability and storing charge in variable
status_tmp = int("" + chr(raw[15]), 16)
case_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# On 14th position we can get charge status of AirPods
charging_status = int("" + chr(raw[14]), 16)
charging_left:bool = (charging_status & (0b00000010 if flip else 0b00000001)) != 0
charging_right:bool = (charging_status & (0b00000001 if flip else 0b00000010)) != 0
charging_case:bool = (charging_status & 0b00000100) != 0
# Return result info in dict format
return dict(
status=1,
charge=dict(
left=left_status,
right=right_status,
case=case_status
),
charging_left=charging_left,
charging_right=charging_right,
charging_case=charging_case,
model=model,
date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
raw=raw.decode("utf-8")
)
# Return if left and right is flipped in the data
def is_flipped(raw):
return (int("" + chr(raw[10]), 16) & 0x02) == 0
def run():
output_file = argv[-1]
while True:
data = get_data()
if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)
sleep(UPDATE_DURATION)
if __name__ == '__main__':
run()