-
Notifications
You must be signed in to change notification settings - Fork 0
/
vulnerability.py
270 lines (233 loc) · 9.73 KB
/
vulnerability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import re
class Vulnerability:
def __init__(self, source_map, pcs):
self.source_map = source_map
self.pcs = self._rm_general_false_positives(pcs)
if source_map:
self.warnings = self._warnings()
def is_vulnerable(self):
return bool(self.pcs)
def get_warnings(self):
return self.warnings
def _rm_general_false_positives(self, pcs):
new_pcs = pcs
if self.source_map:
new_pcs = self._rm_pcs_having_no_source_code(new_pcs)
new_pcs = self._reduce_pcs_having_the_same_pos(new_pcs)
# new_pcs = self._reduce_pcs_without_math_opcodes_in_overflow(new_pcs)
return new_pcs
def _rm_pcs_having_no_source_code(self, pcs):
return [pc for pc in pcs if self.source_map.get_source_code(pc)]
def _reduce_pcs_having_the_same_pos(self, pcs):
d = {}
for pc in pcs:
pos = str(self.source_map.instr_positions[pc])
if pos not in d:
d[pos] = pc
return d.values()
# def _reduce_pcs_without_math_opcodes_in_overflow(self,pcs):
# d = []
# for pc in pcs:
# source_code = self.source_map.get_buggy_line(pc)
# if "Integer Underflow" in str(self.name):
# if "-" in source_code and "--" not in source_code:
# d.append(pc)
# elif "Integer Overflow" in str(self.name):
# if "+" in source_code and "++" not in source_code:
# d.append(pc)
# else:
# d.append(pc)
# return d
def _warnings(self):
warnings = []
for pc in self.pcs:
source_code = self.source_map.get_source_code(pc)
if not source_code:
continue
source_code = self.source_map.get_buggy_line(pc)
# print(self.source_map.get_buggy_line(12))
s = self._warning_content(pc, source_code)
if s:
warnings.append(s)
return warnings
def _warning_content(self, pc, source_code):
new_line_idx = source_code.find('\n')
source_code = source_code.split('\n', 1)[0]
location = self.source_map.get_location(pc)
source = re.sub(self.source_map.root_path, '', self.source_map.get_filename())
line = location['begin']['line'] + 1
column = location['begin']['column'] + 1
s = '%s:%s:%s: Warning: %s.\n' % (source, line, column, self.name)
s += source_code
if new_line_idx != -1:
s += '\n' + self._leading_spaces(source_code) + '^\n'
s += 'Spanning multiple lines.'
return s
def _leading_spaces(self, s):
stripped_s = s.lstrip('[ \t]')
len_of_leading_spaces = len(s) - len(stripped_s)
return s[0:len_of_leading_spaces]
def __str__(self):
s = ''
for warning in self.warnings:
s += '\n' + warning
return s.lstrip('\n')
class CallStack(Vulnerability):
def __init__(self, source_map, pcs, calls_affect_state):
self.source_map = source_map
self.pcs = self._rm_false_positives(pcs, calls_affect_state)
if source_map:
self.name = 'Callstack Depth Attack Vulnerability'
self.warnings = Vulnerability._warnings(self)
def _rm_false_positives(self, pcs, calls_affect_state):
new_pcs = Vulnerability._rm_general_false_positives(self, pcs)
return self._rm_pcs_not_affect_state(new_pcs, calls_affect_state)
def _rm_pcs_not_affect_state(self, pcs, calls_affect_state):
new_pcs = []
for pc in pcs:
if pc in calls_affect_state and calls_affect_state[pc] or pc not in calls_affect_state:
new_pcs.append(pc)
return new_pcs
class TimeDependency(Vulnerability):
def __init__(self, source_map, pcs):
self.name = 'Timestamp Dependency'
Vulnerability.__init__(self, source_map, pcs)
class Reentrancy(Vulnerability):
def __init__(self, source_map, pcs):
self.name = 'Re-Entrancy Vulnerability'
Vulnerability.__init__(self, source_map, pcs)
class MoneyConcurrency(Vulnerability):
def __init__(self, source_map, flows):
self.name = 'Transaction-Ordering Dependency'
self.source_map = source_map
self.flows = flows
if source_map:
self.warnings_of_flows = self._warnings_of_flows()
def is_vulnerable(self):
return bool(self.flows)
def get_warnings_of_flows(self):
return self.warnings_of_flows
def _warnings_of_flows(self):
warnings_of_flows = []
for pcs in self.flows:
s = ''
pcs = Vulnerability._rm_general_false_positives(self, pcs)
warnings = []
for pc in pcs:
source_code = self.source_map.get_source_code(pc)
if not source_code:
continue
source_code = self.source_map.get_buggy_line(pc)
s = Vulnerability._warning_content(self, pc, source_code)
if s:
warnings.append(s)
warnings_of_flows.append(warnings)
return warnings_of_flows
def __str__(self):
s = ''
for i, warnings in enumerate(self.warnings_of_flows):
if i != 0:
s += '\n'
s += 'Flow' + str(i + 1)
for warning in warnings:
s += '\n' + warning
return s
class AssertionFailure(Vulnerability):
def __init__(self, source_map, assertions):
self.source_map = source_map
self.name = ' '.join(re.findall('[A-Z][a-z]+', self.__class__.__name__))
self.assertions = self._reduce_pcs_having_the_same_pos(assertions)
self.assertions = self._reduce_pcs_without_math_opcodes_in_overflow(assertions)
if self.name == 'AssertionFailure' and not source_map:
raise Exception("source_map attribute can't be None")
self.warnings = self._warnings()
def is_vulnerable(self):
return bool(self.assertions)
def _reduce_pcs_without_math_opcodes_in_overflow(self,assertions):
d = []
for asrt in assertions:
source_code = self.source_map.get_buggy_line(asrt.pc)
if "Integer Underflow" in str(self.name):
if "-" in source_code and "--" not in source_code:
pos = source_code.index('-')
subcode = source_code[0:pos]
if "//" not in subcode:
d.append(asrt)
elif "Integer Overflow" in str(self.name):
if "+" in source_code and "++" not in source_code:
pos = source_code.index('+')
subcode = source_code[0:pos]
if "//" not in subcode:
d.append(asrt)
elif "*" in source_code:
pos = source_code.index('*')
subcode = source_code[0:pos]
if "//" not in subcode:
d.append(asrt)
else:
d.append(asrt)
return d
def _reduce_pcs_having_the_same_pos(self, assertions):
d = {}
# print(assertions)
for asrt in assertions:
pos = str(self.source_map.instr_positions[asrt.pc])
# print(pos)
if pos not in d:
d[pos] = asrt
return d.values()
def _warnings(self):
warnings = []
for asrt in self.assertions:
source_code = self.source_map.get_buggy_line(asrt.pc)
s = Vulnerability._warning_content(self, asrt.pc, source_code)
# print(source_code,self.name)
model = ''
for variable in asrt.model.decls():
var_name = str(variable)
if len(var_name.split('-')) > 2:
var_name = var_name.split('-')[2]
if self.source_map.get_parameter_or_state_var(var_name):
model += '\n ' + var_name + ' = ' + str(asrt.model[variable])
if model:
model = "\n%s occurs if:%s" % (self.name, model)
s += model
if s:
warnings.append(s)
return warnings
class IntegerUnderflow(AssertionFailure):
pass
class IntegerOverflow(AssertionFailure):
pass
class ParityMultisigBug2(Vulnerability):
def __init__(self, source_map):
self.source_map = source_map
self.pairs = self._get_contracts_containing_selfdestruct_opcode()
self.warnings = self._warnings()
def is_vulnerable(self):
return bool(self.pairs)
def _warnings(self):
warnings = []
for pair in self.pairs:
source_code = self.source_map.get_buggy_line_from_src(pair[1])
new_line_idx = source_code.find('\n')
source_code = source_code.split('\n', 1)[0]
location = self.source_map.get_location_from_src(pair[1])
source = re.sub(self.source_map.root_path, '', self.source_map.get_filename())
line = location['begin']['line'] + 1
column = location['begin']['column'] + 1
s = '%s:%s:%s: Warning: Parity Multisig Bug 2.\n' % (source, line, column)
s += source_code
if new_line_idx != -1:
s += '\n' + Vulnerability._leading_spaces(self, source_code) + '^\n'
s += 'Spanning multiple lines.'
warnings.append(s)
return warnings
def _get_contracts_containing_selfdestruct_opcode(self):
ret = []
for pair in self.source_map.callee_src_pairs:
disasm_data = open(pair[0] + ".evm.disasm").read()
regex = re.compile("SELFDESTRUCT|SUICIDE")
if regex.search(disasm_data):
ret.append(pair)
return ret