-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_analyzer.py
137 lines (114 loc) · 4.93 KB
/
simple_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import re
import sys
import collections
class SimpleAnalyzer:
def __init__(self):
# rule-based tokens
self.error_keys_regex = ["(.)*[^a-z]error", r"^e/"]
self.error_keys = ["fail", "crash", "exception", " kill", " fault", " die"]
self.warning_keys = ["warn", "bad", "refuse", " miss", " not ", " no ", "invalid", "ignore",
"undefined", "unsuccess", "unauthor", "unable", "skip", "cannot", "unknown"]
# with timestamp
self.error_list = []
self.warning_list = []
self.all_data_list = []
# de-timestamp
self.formatted_error_list = []
self.formatted_warning_list = []
# Clean up lists
def clean_lists(self):
self.error_list = []
self.warning_list = []
# TODO: need to adjust to JSON format later
# Return current de-timestamp error log list
def return_sorted_error(self):
self.formatted_error_list = [re.sub("^[(0-9)|( :.\\-)]+", "", s) for s in self.error_list]
error_counter = collections.Counter(self.formatted_error_list)
return error_counter.keys()
# Return current de-timestamp warning log list
def return_sorted_warning(self):
self.formatted_warning_list = [re.sub("^[(0-9)|( :.\\-)]+", "", s) for s in self.warning_list]
warning_counter = collections.Counter(self.formatted_warning_list)
return warning_counter.keys()
# Return current error log list
def return_error(self):
error_counter = collections.Counter(self.error_list)
# counter / counter.keys() / counter.most_common()
# print filename + ": error=" + str(len(self.error_list)) + " nondup_error=" + str(len(counter.keys())) + " warning=" + str(len(self.warning_list))
return error_counter.keys()
# Return current warning log list
def return_warning(self):
warning_counter = collections.Counter(self.warning_list)
return warning_counter.keys()
# Return all the log
def return_list(self):
return self.all_data_list
# Parse the whole file and store analyzed log in lists
def parse_file(self, filename):
self.error_list = []
self.warning_list = []
with open(filename) as f:
lines = f.readlines()
for l in lines:
error = False
# make the line lowercased
lowercase_line = l.lower()
# minize continuous spaces to get consistency
cleaned_line = re.sub("[ ]+", " ", l).strip()
# remove (number) before starting of message
cleaned_line = re.sub("[ ]*\\([ ]*[0-9]+\\):", ":", cleaned_line).strip()
self.all_data_list.append(cleaned_line)
for k in self.error_keys:
if k in lowercase_line:
self.error_list.append(cleaned_line)
# print "Error: " + l
error = True
break
if error:
continue
for k in self.error_keys_regex:
if re.match(k, lowercase_line):
self.error_list.append(cleaned_line)
# print "Error: " + l
error = True
break
if error:
continue
for k in self.warning_keys:
if k in lowercase_line:
self.warning_list.append(cleaned_line)
break
# Parse the whole file and store analyzed log in lists
# Return Warning or Error immediately
# TODO: colored output
def parse_line(self, line):
# make the line lowercased
lowercase_line = line.lower()
# minize continuous spaces to get consistency
cleaned_line = re.sub("[ ]+", " ", line).strip()
# remove (number) before starting of message
cleaned_line = re.sub("[ ]*\\([ ]*[0-9]+\\):", ":", cleaned_line).strip()
self.all_data_list.append(cleaned_line)
for k in self.error_keys:
if k in lowercase_line:
self.error_list.append(cleaned_line)
return "Error: " + line
for k in self.error_keys_regex:
if re.match(k, lowercase_line):
self.error_list.append(cleaned_line)
return "Error: " + line
for k in self.warning_keys:
if k in lowercase_line:
self.warning_list.append(cleaned_line)
return "Warning: " + line
return ""
# This is a basic function test.
# It will parse through current directory for clean_(.)*.txt and extract data out.
if __name__ == '__main__':
current_dir = os.getcwd()
lp = SimpleAnalyzer()
for filename in os.listdir(current_dir):
if filename.endswith(".txt") and filename.startswith("cleaned_") and not os.path.isdir(filename):
lp.parse_file(filename)
print lp.return_sorted_error()