Skip to content

Commit

Permalink
🐛Fixed for WSL2(Ubuntu)
Browse files Browse the repository at this point in the history
  • Loading branch information
7rikazhexde committed Jul 31, 2024
1 parent fca0193 commit 37689da
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 15 deletions.
49 changes: 42 additions & 7 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import subprocess
import sys
import termios
import time
from concurrent.futures import Future
from typing import Any, Generator, List, Optional, Tuple
Expand All @@ -15,19 +14,55 @@
from video_grid_merge import __main__ as main


def test_reset_input_buffer(mocker: Any) -> None:
mock_tcflush = mocker.patch("termios.tcflush")
mock_system = mocker.patch("os.system")
def test_reset_input_buffer_success(mocker: Any) -> None:
mock_fcntl = mocker.patch("fcntl.fcntl")
mock_select = mocker.patch("select.select")
mock_stdin = mocker.patch("sys.stdin")

# Simulate input being available once, then no more input
mock_select.side_effect = [([mock_stdin], [], []), ([], [], [])]
mock_stdin.read.return_value = "test input"

main.reset_input_buffer()

assert (
mock_fcntl.call_count == 3
) # fcntl is called 3 times in the actual implementation
assert mock_select.call_count == 2
mock_stdin.read.assert_called_once_with(1024)


def test_reset_input_buffer_no_input(mocker: Any) -> None:
mock_fcntl = mocker.patch("fcntl.fcntl")
mock_select = mocker.patch("select.select")
mock_stdin = mocker.patch("sys.stdin")

# Simulate no input being available
mock_select.return_value = ([], [], [])

main.reset_input_buffer()

mock_tcflush.assert_called_once_with(sys.stdin, termios.TCIFLUSH)
mock_system.assert_called_once_with("stty sane")
assert (
mock_fcntl.call_count == 3
) # fcntl is called 3 times in the actual implementation
mock_select.assert_called_once()
mock_stdin.read.assert_not_called()


def test_reset_input_buffer_exception(mocker: Any) -> None:
mock_fcntl = mocker.patch("fcntl.fcntl")
mock_fcntl.side_effect = Exception("Test exception")
mock_sleep = mocker.patch("time.sleep")

main.reset_input_buffer()

mock_fcntl.assert_called_once()
mock_sleep.assert_called_once_with(0.1)


def test_safe_input(mocker: Any) -> None:
mock_input = mocker.patch("builtins.input", return_value="test input")
mock_reset = mocker.patch("video_grid_merge.__main__.reset_input_buffer")
mock_input = mocker.patch("builtins.input", return_value="test input")

result = main.safe_input("Enter something: ")

Expand Down
34 changes: 26 additions & 8 deletions video_grid_merge/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import fcntl
import math
import os
import select
import subprocess
import sys
import termios
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
Expand All @@ -22,13 +23,26 @@

def reset_input_buffer() -> None:
"""
Reset the input buffer and terminal settings.
Reset the input buffer.
This function flushes the input buffer using termios and resets the terminal
settings to their default state using the 'stty sane' command.
This function attempts to clear the input buffer in a way that's compatible
with Unix-like systems (including WSL2). It uses a non-blocking read approach
which is more reliable across different environments.
"""
termios.tcflush(sys.stdin, termios.TCIFLUSH)
os.system("stty sane") # Reset terminal settings
try:
# Set stdin to non-blocking mode
old_flags = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)

# Read and discard any available input
while len(select.select([sys.stdin], [], [], 0)[0]) > 0:
sys.stdin.read(1024)

# Restore original flags
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, old_flags)
except Exception:
# If anything goes wrong, fall back to a simple time delay
time.sleep(0.1)


def safe_input(prompt: str) -> str:
Expand Down Expand Up @@ -307,8 +321,12 @@ def main(
input_folder (Optional[str]): The path to the input folder. If None, a default path is used.
output_folder (Optional[str]): The path to the output folder. If None, a default path is used.
"""
input_folder = input_folder or "./video_grid_merge/media/input"
output_folder = output_folder or "./video_grid_merge/media/output"
input_folder = input_folder or os.path.join(
".", "video_grid_merge", "media", "input"
)
output_folder = output_folder or os.path.join(
".", "video_grid_merge", "media", "output"
)

start = time.perf_counter()
rnf.rename_files_with_spaces(input_folder)
Expand Down

0 comments on commit 37689da

Please sign in to comment.