-
Notifications
You must be signed in to change notification settings - Fork 0
/
import_asa.py
executable file
·57 lines (48 loc) · 2.54 KB
/
import_asa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import sys
import re
import common
from import_base import BaseImporter
import time
# This implementation is incomplete:
# TODO: validate implementation with test data
# TODO: verify protocol is TCP
# TODO: parse timestamp into dictionary['Timestamp']
class ASAImporter(BaseImporter):
def translate(self, line, line_num, dictionary):
"""
Converts a given syslog line into a dictionary of (ip, port, ip, port)
Args:
line: The syslog line to parse
line_num: The line number, for error printouts
dictionary: The dictionary to write key/values pairs into
Returns:
0 on success and non-zero on error.
1 => The protocol wasn't TCP and was ignored.
2 => error in parsing the line. It was too short for some reason
"""
# regexp to extract from ASA syslog
regexp = r"^.* Built (?P<asa_in_out>in|out)bound (?P<asa_protocol>.*) connection (?P<asa_conn_id>\d+) for (?P<asa_src_zone>.*):(?P<asa_src_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})/(?P<asa_src_port>\d+) \(.*/\d+\) to (?P<asa_dst_zone>.*):(?P<asa_dst_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})/(?P<asa_dst_port>\d+) .*"
m = re.match(regexp, line)
if m:
# srcIP, srcPort, dstIP, dstPort
# The order of the source and destination depends on the direction, i.e., inbound or outbound
if m.group('asa_in_out') == 'in':
dictionary['SourceIP'] = common.IPtoInt(*(m.group('asa_src_ip').split(".")))
dictionary['SourcePort'] = m.group('asa_src_port')
dictionary['DestinationIP'] = common.IPtoInt(*(m.group('asa_dst_ip').split(".")))
dictionary['DestinationPort'] = m.group('asa_dst_port')
else:
dictionary['DestinationIP'] = common.IPtoInt(*(m.group('asa_src_ip').split(".")))
dictionary['DestinationPort'] = m.group('asa_src_port')
dictionary['SourceIP'] = common.IPtoInt(*(m.group('asa_dst_ip').split(".")))
dictionary['SourcePort'] = m.group('asa_dst_port')
# ASA logs don't always have a timestamp. If your logs do, you may want to edit the line below to parse it.
dictionary['Timestamp'] = time.strftime(self.mysql_time_format, time.localtime())
return 0
else:
print("error parsing line {0}: {1}".format(line_num, line))
return 2
# If running as a script, begin by executing main.
if __name__ == "__main__":
importer = ASAImporter()
importer.main(sys.argv)