This repository has been archived by the owner on Dec 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
lambda_function.py
executable file
·102 lines (81 loc) · 2.92 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import json
import os
import time
import duo_client
import requests
i_key = os.environ.get('I_KEY')
s_key = os.environ.get('S_KEY')
host = os.environ.get('HOST')
collector_url = os.environ.get('COLL_ENDPOINT')
scan_interval_in_sec = int(os.environ.get('SCAN_INTERVAL_IN_SEC', 0))
admin_api = duo_client.Admin(
ikey=i_key,
skey=s_key,
host=host
)
def fetch_logs(min_time=None, max_time=None):
auth_logs = admin_api.get_authentication_log(api_version=2, mintime=min_time, maxtime=max_time)
#this isn't considering iteration in event too many messages returned.
return auth_logs['authlogs']
def fetch_admin_logs(min_time=None):
admin_logs = admin_api.get_administrator_log(mintime=min_time)
print('Retrieved admin Logs.')
return admin_logs
def fetch_telephony_logs(min_time=None):
telephony_logs = admin_api.get_telephony_log(mintime=min_time)
print('Retrieved Telephony Logs::')
return telephony_logs
def format_auth_logs(data):
out = []
for i in data:
i['auth_device']['name'] = '*****'
out.append(i)
data = '\n'.join([json.dumps(i) for i in out])
return data
def format_telephony_logs(data):
out = []
for i in data:
i['phone']= '*****'
out.append(i)
data = '\n'.join([json.dumps(i) for i in out])
return data
def format_admin_logs(data):
out= []
for i in data:
if(i.get('description')):
i['description'] = json.loads(i['description'])
if('device' in i['description']):
i['description']['device'] = "*****"
elif ('phones' in i['description'] ):
for ph in (i['description']['phones']):
i['description']['phones'][ph]['number'] = "*****"
elif ('phone' in i['description'] ):
i['description']['phone'] ="*****"
out.append(i)
else:
print("admin logs without description")
data = '\n'.join([json.dumps(i) for i in out])
return data
def dump_logs(data):
print('dumping logs')
r = requests.post(url=collector_url, data=data)
tries = 10
while r.status_code != 200:
if tries <= 0:
raise Exception('Excessive retries. Issue in posting log data.')
tries-=1
time.sleep(2)
r = requests.post(collector_url, data=data)
print('dumped successfully.')
def lambda_handler(req, context):
logs = fetch_logs(min_time=(time.time()-scan_interval_in_sec)*1000, max_time=time.time()*1000)
logs = format_auth_logs(logs)
dump_logs(logs)
#fetch admin logs
logs_admin = fetch_admin_logs(min_time=(time.time()-scan_interval_in_sec))
logs_admin = format_admin_logs(logs_admin)
dump_logs(logs_admin)
#fetch telephony logs
logs_telephony = fetch_telephony_logs(min_time=(time.time()-scan_interval_in_sec))
logs_telephony = format_telephony_logs(logs_telephony)
dump_logs(logs_telephony)