-
Notifications
You must be signed in to change notification settings - Fork 156
/
debug_webserver.py
175 lines (132 loc) · 5.38 KB
/
debug_webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Simple HTTP listener to be used with debugging various EDMC sends."""
from __future__ import annotations
import gzip
import json
import pathlib
import threading
import zlib
from http import server
from typing import Any, Callable, Literal
from urllib.parse import parse_qs
import config
from EDMCLogging import get_main_logger
logger = get_main_logger()
output_lock = threading.Lock()
output_data_path = pathlib.Path(config.app_dir_path / 'logs' / 'http_debug')
SAFE_TRANSLATE = str.maketrans(dict.fromkeys("!@#$%^&*()./\\\r\n[]-+='\";:?<>,~`", '_'))
class LoggingHandler(server.BaseHTTPRequestHandler):
"""HTTP Handler implementation that logs to EDMCs logger and writes data to files on disk."""
def log_message(self, format: str, *args: Any) -> None:
"""Override default handler logger with EDMC logger."""
logger.info(format % args)
def do_POST(self) -> None: # noqa: N802 # I cant change it
"""Handle POST."""
logger.info(f"Received a POST for {self.path!r}!")
data_raw: bytes = self.rfile.read(int(self.headers['Content-Length']))
encoding = self.headers.get('Content-Encoding')
to_save = data = self.get_printable(data_raw, encoding)
target_path = self.path
if len(target_path) > 1 and target_path[0] == '/':
target_path = target_path[1:]
elif len(target_path) == 1 and target_path[0] == '/':
target_path = 'WEB_ROOT'
response: Callable[[str], str] | str | None = DEFAULT_RESPONSES.get(target_path)
if callable(response):
response = response(to_save)
self.send_response_only(200, "OK")
if response is not None:
self.send_header('Content-Length', str(len(response)))
self.end_headers() # This is needed because send_response_only DOESN'T ACTUALLY SEND THE RESPONSE </rant>
if response is not None:
self.wfile.write(response.encode())
self.wfile.flush()
if target_path == 'edsm':
# attempt to extract data from urlencoded stream
try:
edsm_data = extract_edsm_data(data)
data = data + "\n" + json.dumps(edsm_data)
except Exception:
pass
target_file = output_data_path / (safe_file_name(target_path) + '.log')
if target_file.parent != output_data_path:
logger.warning(f"REFUSING TO WRITE FILE THAT ISN'T IN THE RIGHT PLACE! {target_file=}")
logger.warning(f'DATA FOLLOWS\n{data}')
return
with output_lock, target_file.open('a') as file:
file.write(to_save + "\n\n")
@staticmethod
def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str:
"""
Convert an incoming data stream into a string.
:param data: The data to convert
:param compression: The compression to remove, defaults to None
:raises ValueError: If compression is unknown
:return: printable strings
"""
ret: bytes = b''
if compression is None:
ret = data
elif compression == 'deflate':
ret = zlib.decompress(data)
elif compression == 'gzip':
ret = gzip.decompress(data)
else:
raise ValueError(f'Unknown encoding for data {compression!r}')
return ret.decode('utf-8', errors='replace')
def safe_file_name(name: str):
"""
Escape special characters out of a file name.
This is a nicety. Don't rely on it to be ultra secure.
"""
return name.translate(SAFE_TRANSLATE)
def generate_inara_response(raw_data: str) -> str:
"""Generate nonstatic data for inara plugin."""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return "UNKNOWN REQUEST"
out = {
'header': {
'eventStatus': 200
},
'events': [
{
'eventName': e['eventName'], 'eventStatus': 200, 'eventStatusText': "DEBUG STUFF"
} for e in data.get('events')
]
}
return json.dumps(out)
def extract_edsm_data(data: str) -> dict[str, Any]:
"""Extract relevant data from edsm data."""
res = parse_qs(data)
return {name: data[0] for name, data in res.items()}
def generate_edsm_response(raw_data: str) -> str:
"""Generate nonstatic data for edsm plugin."""
try:
data = extract_edsm_data(raw_data)
events = json.loads(data['message'])
except (json.JSONDecodeError, Exception):
logger.exception("????")
return "UNKNOWN REQUEST"
out = {
'msgnum': 100, # Ok
'msg': 'debug stuff',
'events': [
{'event': e['event'], 'msgnum': 100, 'msg': 'debug stuff'} for e in events
]
}
return json.dumps(out)
DEFAULT_RESPONSES = {
'inara': generate_inara_response,
'edsm': generate_edsm_response
}
def run_listener(host: str = "127.0.0.1", port: int = 9090) -> None:
"""Run a listener thread."""
output_data_path.mkdir(exist_ok=True)
logger.info(f'Starting HTTP listener on {host=} {port=}!')
listener = server.HTTPServer((host, port), LoggingHandler)
logger.info(listener)
threading.Thread(target=listener.serve_forever, daemon=True).start()
if __name__ == "__main__":
output_data_path.mkdir(exist_ok=True)
server.HTTPServer(("127.0.0.1", 9090), LoggingHandler).serve_forever()