-
Notifications
You must be signed in to change notification settings - Fork 101
/
sg-calibrator.py
155 lines (108 loc) · 5.11 KB
/
sg-calibrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import sniffer
import json
import os
import cPickle
import time
from argparse import ArgumentParser
from multiprocessing import Process
from configs import *
def channel_hopper():
import sniffer
while True:
for channel in xrange(1, 14):
print '[channel hopper] Switching to channel', channel
os.system('iwconfig %s channel %d' % (configs['iface'], channel))
time.sleep(6)
def set_configs():
parser = ArgumentParser()
parser.add_argument('-i',
dest='iface',
required=True,
type=str,
help='Specify network interface to use')
return parser.parse_args().__dict__
if __name__ == '__main__':
configs = set_configs()
interface = configs['iface']
whitelist = {}
calibration_table = {
'len' : 0,
'calibrated_count' : 0,
'calibrated' : False,
'ssids' : {},
}
hopper = Process(target=channel_hopper, args=())
hopper.daemon = True
hopper.start()
with open('whitelist.txt') as fd:
for line in fd:
line = line.split()
ssid = line[0]
bssid = line[1].lower()
if ssid in whitelist:
whitelist[ssid].add(bssid)
calibration_table['ssids'][ssid]['bssids'][bssid] = {
'calibrated' : False,
'packets' : [],
'len' : 0,
}
calibration_table['ssids'][ssid]['len'] += 1
else:
whitelist[ssid] = set()
whitelist[ssid].add(bssid)
calibration_table['ssids'][ssid] = {
'bssids' : {
bssid : {
'calibrated' : False,
'packets' : [],
'len' : 0,
},
},
'calibrated_count' : 0,
'calibrated' : False,
'len' : 1,
}
calibration_table['len'] += 1
probe_responses = sniffer.response_sniffer(interface)
print json.dumps(calibration_table, indent=4, sort_keys=True)
for response in probe_responses:
ssid = response['essid']
bssid = response['addr3'].lower()
tx = response['tx']
print '[probe_response]', ssid, bssid, tx
''' so ugly '''
if ssid in calibration_table['ssids']:
if bssid in calibration_table['ssids'][ssid]['bssids']:
if not calibration_table['ssids'][ssid]['calibrated']:
if not calibration_table['ssids'][ssid]['bssids'][bssid]['calibrated']:
calibration_table['ssids'][ssid]['bssids'][bssid]['packets'].append(tx)
calibration_table['ssids'][ssid]['bssids'][bssid]['len'] += 1
if calibration_table['ssids'][ssid]['bssids'][bssid]['len'] >= CALIBRATION_TX_COUNT:
calibration_table['ssids'][ssid]['bssids'][bssid]['calibrated'] = True
calibration_table['ssids'][ssid]['calibrated_count'] += 1
if calibration_table['ssids'][ssid]['calibrated_count'] >= calibration_table['ssids'][ssid]['len']:
calibration_table['ssids'][ssid]['calibrated'] = True
calibration_table['calibrated_count'] += 1
if calibration_table['calibrated_count'] >= calibration_table['len']:
calibration_table['calibrated'] = True
break
print json.dumps(calibration_table, indent=4, sort_keys=True)
ssids = calibration_table['ssids']
for s in ssids:
for bssid in ssids[s]['bssids']:
tx_list = ssids[s]['bssids'][bssid]['packets']
tx_list_len = ssids[s]['bssids'][bssid]['len']
mean = sum(tx_list) / tx_list_len
max_dev = max(abs(el - mean) for el in tx_list)
upper_bound = mean + (max_dev * N_TIMES_MAX_DEV)
lower_bound = mean - (max_dev * N_TIMES_MAX_DEV)
ssids[s]['bssids'][bssid]['mean'] = mean
ssids[s]['bssids'][bssid]['max_dev'] = max_dev
ssids[s]['bssids'][bssid]['upper_bound'] = upper_bound
ssids[s]['bssids'][bssid]['lower_bound'] = lower_bound
with open(r'calibration_table.pickle', 'wb') as fd:
cPickle.dump(calibration_table, fd)
with open(r'whitelist.pickle', 'wb') as fd:
cPickle.dump(whitelist, fd)
print json.dumps(calibration_table, indent=4, sort_keys=True)
hopper.terminate()