Skip to content

Commit

Permalink
posix: wrap stdin in a cutom buffer
Browse files Browse the repository at this point in the history
select() is not reliable to as an alternative to kbhit(), it only works
one time an will than returne false untill the next key-press by the
user.
Now the content of stdin is read completly and put in a custom buffer,
which allows for peeking of data.
  • Loading branch information
Cube707 committed Nov 6, 2022
1 parent 73230f1 commit 6caa67c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion readchar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Library to easily read single chars and key strokes"""

__version__ = "4.0.4-dev1"
__version__ = "4.0.4-dev2"
__all__ = ["readchar", "readkey", "key", "config"]

from sys import platform
Expand Down
39 changes: 29 additions & 10 deletions readchar/_posix_read.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
import termios
from copy import copy
from io import StringIO
from select import select

from ._config import config
Expand All @@ -12,53 +13,71 @@ class ReadChar:

def __init__(self, cfg: config = None) -> None:
self.config = cfg if cfg is not None else config
self._buffer = StringIO()

def __enter__(self) -> "ReadChar":
self.fd = sys.stdin.fileno()
term = termios.tcgetattr(self.fd)
self.old_settings = copy(term)
term[3] &= ~(termios.ICANON | termios.ECHO | termios.IGNBRK | termios.BRKINT)

term[3] &= ~(
termios.ICANON # don't require ENTER
| termios.ECHO # don't echo
| termios.IGNBRK
| termios.BRKINT
)
term[6][termios.VMIN] = 0 # imideatly process every input
term[6][termios.VTIME] = 0
termios.tcsetattr(self.fd, termios.TCSAFLUSH, term)
return self

def __exit__(self, type, value, traceback) -> None:
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_settings)
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old_settings)

def __update(self) -> None:
"""check stdin and update the interal buffer if it holds data"""
if sys.stdin in select([sys.stdin], [], [], 0)[0]:
pos = self._buffer.tell()
data = sys.stdin.read()
self._buffer.write(data)
self._buffer.seek(pos)

@property
def key_waiting(self) -> bool:
"""True if a key has been pressed and is waiting to be read. False if not."""
return sys.stdin in select([sys.stdin], [], [], 0)[0]
self.__update()
pos = self._buffer.tell()
next_byte = self._buffer.read(1)
self._buffer.seek(pos)
return bool(next_byte)

def char(self) -> str:
"""Reads a singel char from the input stream and returns it as a string of
length one. Does not require the user to press ENTER."""
return sys.stdin.read(1)
self.__update()
return self._buffer.read(1)

def key(self) -> str:
"""Reads a keypress from the input stream and returns it as a string. Keypressed
consisting of multiple characterrs will be read completly and be returned as a
string matching the definitions in `key.py`.
Does not require the user to press ENTER."""
c1 = self.char()
self.__update()

c1 = self.char()
if c1 in self.config.INTERRUPT_KEYS:
raise KeyboardInterrupt

if c1 != "\x1B":
return c1

c2 = self.char()
if c2 not in "\x4F\x5B":
return c1 + c2

c3 = self.char()
if c3 not in "\x31\x32\x33\x35\x36":
return c1 + c2 + c3

c4 = self.char()
if c4 not in "\x30\x31\x33\x34\x35\x37\x38\x39":
return c1 + c2 + c3 + c4

c5 = self.char()
return c1 + c2 + c3 + c4 + c5

Expand Down

0 comments on commit 6caa67c

Please sign in to comment.