From 51490032289b8cc100cd32b5d5d84ca7342e66a6 Mon Sep 17 00:00:00 2001 From: Russ Webber Date: Thu, 25 Jul 2024 22:22:08 +1000 Subject: [PATCH] feat: add --script argument to cli --- hypno/__init__.py | 2 +- hypno/__main__.py | 11 ++++++++--- hypno/api.py | 16 ++++++---------- hypno/injection.c | 33 ++++++++++++++++++++------------- tests/test_hypno.py | 31 ++++++++++++++++++++++++++++++- 5 files changed, 65 insertions(+), 28 deletions(-) diff --git a/hypno/__init__.py b/hypno/__init__.py index c6b5234..90896bf 100644 --- a/hypno/__init__.py +++ b/hypno/__init__.py @@ -1 +1 @@ -from .api import inject_py, CodeTooLongException +from .api import inject_py, inject_py_script, CodeTooLongException diff --git a/hypno/__main__.py b/hypno/__main__.py index 98e25e8..82ee847 100644 --- a/hypno/__main__.py +++ b/hypno/__main__.py @@ -1,19 +1,24 @@ from argparse import Namespace, ArgumentParser from typing import Optional, List +from pathlib import Path -from hypno import inject_py +from hypno import inject_py, inject_py_script def parse_args(args: Optional[List[str]]) -> Namespace: parser = ArgumentParser(description='Inject python code into a running python process.') parser.add_argument('pid', type=int, help='pid of the process to inject code into') - parser.add_argument('python_code', type=str.encode, help='python code to inject') + parser.add_argument('--script', action='store_true', help='Inject a script instead of code', default=False) + parser.add_argument('python_code', type=str, help='python code to inject') return parser.parse_args(args) def main(args: Optional[List[str]] = None) -> None: parsed_args = parse_args(args) - inject_py(parsed_args.pid, parsed_args.python_code) + if parsed_args.script: + inject_py_script(parsed_args.pid, Path(parsed_args.python_code)) + else: + inject_py(parsed_args.pid, parsed_args.python_code.encode()) if __name__ == '__main__': diff --git a/hypno/api.py b/hypno/api.py index d103ddf..a06ae73 100644 --- a/hypno/api.py +++ b/hypno/api.py @@ -8,7 +8,7 @@ INJECTION_LIB_PATH = Path(find_spec('.injection', __package__).origin) MAGIC = b'--- hypno code start ---' -MAGIC2 = b'--- hypno script path start ---' +PATH_SENTINEL = b'--- hypno script path start ---' WINDOWS = sys.platform == 'win32' @@ -62,14 +62,8 @@ def inject_py_script(pid: int, python_script: Path, permissions=0o644) -> None: script_path_null_terminated = str(python_script).encode() + b'\0' lib = INJECTION_LIB_PATH.read_bytes() - magic_addr = lib.find(MAGIC2) + magic_addr = lib.find(PATH_SENTINEL) path_str_addr = magic_addr - 1 - # max_size_addr = magic_addr + len(MAGIC2) - # max_size_end_addr = lib.find(b'\0', max_size_addr) - # max_size = int(lib[max_size_addr:max_size_end_addr]) - - # if len(script_path_null_terminated) => max_size: - # raise CodeTooLongException(script_path_null_terminated, max_size) patched_lib = bytearray(lib) patched_lib[path_str_addr:(path_str_addr + len(script_path_null_terminated))] = script_path_null_terminated @@ -80,8 +74,10 @@ def inject_py_script(pid: int, python_script: Path, permissions=0o644) -> None: with NamedTemporaryFile(prefix='hypno', suffix=INJECTION_LIB_PATH.suffix, delete=False) as temp: path = Path(temp.name) temp.write(patched_lib) + print(f"Injecting {path} into {pid}") path.chmod(permissions) inject(pid, str(temp.name), uninject=True) finally: - if path is not None and path.exists(): - path.unlink() + pass + # if path is not None and path.exists(): + # path.unlink() diff --git a/hypno/injection.c b/hypno/injection.c index 72f2f9f..6111d58 100644 --- a/hypno/injection.c +++ b/hypno/injection.c @@ -1,6 +1,16 @@ #include #include + +#if defined(_WIN32) || defined(_WIN64) +#include +#define MAX_FILE_PATH MAX_PATH +#elif defined(__APPLE__) +#include +#define MAX_FILE_PATH PATH_MAX +#elif defined(__linux__) #include +#define MAX_FILE_PATH PATH_MAX +#endif #define MAX_PYTHON_CODE_SIZE 60500 #define STR_EXPAND(tok) #tok @@ -9,28 +19,25 @@ PyMODINIT_FUNC PyInit_injection(void) {return NULL;} volatile char PYTHON_CODE[MAX_PYTHON_CODE_SIZE + 1] = "\0--- hypno code start ---" STR(MAX_PYTHON_CODE_SIZE); -volatile char PYTHON_SCRIPT_PATH[PATH_MAX] = "\0--- hypno script path start ---" STR(PATH_MAX); +volatile char PYTHON_SCRIPT_PATH[MAX_FILE_PATH] = "\0--- hypno script path start ---" STR(MAX_FILE_PATH); void run_python_code() { - if (PYTHON_CODE[0]) { - int saved_errno = errno; - PyGILState_STATE gstate = PyGILState_Ensure(); - if (PYTHON_CODE[0] != '\0') { - PyRun_SimpleString(PYTHON_CODE); - } else if (PYTHON_SCRIPT_PATH[0] != '\0') { - int fp = fopen(PYTHON_SCRIPT_PATH, "r"); + int saved_errno = errno; + PyGILState_STATE gstate = PyGILState_Ensure(); + if (PYTHON_CODE[0] != '\0') { + PyRun_SimpleString(PYTHON_CODE); + } else if (PYTHON_SCRIPT_PATH[0] != '\0') { + int fp = fopen(PYTHON_SCRIPT_PATH, "r"); + if (fp) { PyRun_SimpleFile(fp, PYTHON_SCRIPT_PATH); fclose(fp); } - - PyGILState_Release(gstate); - errno = saved_errno; } + PyGILState_Release(gstate); + errno = saved_errno; } #ifdef _WIN32 - #include - BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpReserved) { switch( fdwReason ) { case DLL_PROCESS_ATTACH: diff --git a/tests/test_hypno.py b/tests/test_hypno.py index fc561e0..55f29fa 100644 --- a/tests/test_hypno.py +++ b/tests/test_hypno.py @@ -3,8 +3,9 @@ from pathlib import Path import pytest from pytest import mark, fixture +from tempfile import NamedTemporaryFile -from hypno import inject_py, CodeTooLongException +from hypno import inject_py, inject_py_script, CodeTooLongException WHILE_TRUE_SCRIPT = Path(__file__).parent.resolve() / 'while_true.py' PROCESS_WAIT_TIMEOUT = 1.5 @@ -32,6 +33,16 @@ def process(process_loop_output, process_end_output): yield process process.kill() +def write_code_to_temp_file(code: bytes, use_thread: bool) -> Path: + with NamedTemporaryFile(delete=False) as temp: + if use_thread: + temp.write(b"from threading import Thread\n") + temp.write(b"def x(): " + code + b"\n") + temp.write(b"\nThread(target=x).start()\n") + else: + temp.write(code) + temp.flush() + return Path(temp.name) def start_in_thread_code(code: bytes, use_thread: bool) -> bytes: if not use_thread: @@ -60,6 +71,24 @@ def test_hypno(process: Popen, times: int, thread: bool, process_loop_output: st assert stdout == data * (times - 1), stdout +@mark.parametrize('times', [0, 1, 2, 3]) +@mark.parametrize('thread', [True, False]) +def test_hypno_script(process: Popen, times: int, thread: bool, process_loop_output: str, process_end_output: str): + data = b'test_data_woohoo' + for _ in range(times - 1): + tmp_file = write_code_to_temp_file(b'print("' + data + b'", end="");', thread) + inject_py_script(process.pid, tmp_file) + # Making sure the process is still working + process.stderr.read(len(process_loop_output)) + inject_py(process.pid, start_in_thread_code(b'__import__("__main__").should_exit = True', thread)) + assert process.wait(PROCESS_WAIT_TIMEOUT) == 0 + + stdout = process.stdout.read() + stderr = process.stderr.read() + assert stderr.endswith(process_end_output.encode()), stderr + assert stdout == data * (times - 1), stdout + + def test_hypno_with_too_long_code(): code = b'^' * 100000 with pytest.raises(CodeTooLongException):