diff --git a/ptrlib/binary/encoding/__init__.py b/ptrlib/binary/encoding/__init__.py index 2fb0c06..d77b38f 100644 --- a/ptrlib/binary/encoding/__init__.py +++ b/ptrlib/binary/encoding/__init__.py @@ -1,6 +1,6 @@ from .ansi import * from .bitconv import * from .byteconv import * +from .char import * from .dump import * -from .locale import * from .table import * diff --git a/ptrlib/binary/encoding/ansi.py b/ptrlib/binary/encoding/ansi.py index f74eb6b..41179fd 100644 --- a/ptrlib/binary/encoding/ansi.py +++ b/ptrlib/binary/encoding/ansi.py @@ -1,8 +1,11 @@ import enum -from typing import Generator, List, Optional +from logging import getLogger +from typing import Callable, Generator, List, Optional, Tuple, Union + +logger = getLogger(__name__) -# Based on https://bjh21.me.uk/all-escapes/all-escapes.txt +# Based on https://bjh21.me.uk/all-escapes/all-escapes.txt class AnsiOp(enum.Enum): UNKNOWN = 0 @@ -42,6 +45,9 @@ class AnsiOp(enum.Enum): SCI = enum.auto() # Single character introducer CSI = enum.auto() # Control sequence + # Fp Private Control Functions + DECKPAM = 0x80 + # CSI Sequence ICH = 0x100 # Insert character SBC = enum.auto() # Set border color @@ -87,7 +93,43 @@ class AnsiOp(enum.Enum): PTX = enum.auto() # Parallel texts SDS = enum.auto() # Start directed string SIMD = enum.auto() # Select implicit movement direction - + HPA = enum.auto() # Character position absolute + HPR = enum.auto() # Character position forward + REP = enum.auto() # Repeat + DA = enum.auto() # Device attributes + HSC = enum.auto() # Hide or show cursor + VPA = enum.auto() # Line position absolute + VPR = enum.auto() # Line position forward + HVP = enum.auto() # Character and line position + TBC = enum.auto() # Tabulation clear + PRC = enum.auto() # Print ROM character + SM = enum.auto() # Set mode + MC = enum.auto() # Media copy + HPB = enum.auto() # Character position backward + VPB = enum.auto() # Line position backward + RM = enum.auto() # Reset mode + CHC = enum.auto() # Clear and home cursor + SGR = enum.auto() # Select graphic rendition + SSM = enum.auto() # Set specific margin + DSR = enum.auto() # Device status report + DAQ = enum.auto() # Device area qualification + DECSSL = enum.auto() # Select set-up language + DECLL = enum.auto() # Load LEDs + DECSTBM = enum.auto() # Set top and bottom margins + RSM = enum.auto() # Reset margins + SCP = enum.auto() # Save cursor position + DECSLPP = enum.auto() # Set lines per physical page + RCP = enum.auto() # Reset cursor position + DECSVTS = enum.auto() # Set vertical tab stops + DECSHORP = enum.auto() # Set horizontal pitch + DGRTC = enum.auto() # Request terminal configuration + DECTST = enum.auto() # Invoke confidence test + SSW = enum.auto() # Screen switch + CAT = enum.auto() # Clear all tabs + + # SCS: Select character set + SCS_B = 0x200 # Default charset + SCS_0 = enum.auto() # DEC special charset class AnsiInstruction(object): def __init__(self, @@ -99,8 +141,40 @@ def __init__(self, self._args = args @property - def args(self): - return self._args + def is_skip(self): + """Check if instruction can be skipped + + Returns: + bool: True if this instruction is not important for drawing screen + """ + return self._code in [ + AnsiOp.DECKPAM, + AnsiOp.DECSLPP, + AnsiOp.DECSTBM, + AnsiOp.SGR, + ] + + def __getitem__(self, i: int): + assert isinstance(i, int), "Slice must be integer" + if i < 0 or i >= len(self._args): + return None + else: + return self._args[i] + + def __eq__(self, other): + if isinstance(other, AnsiInstruction): + return self._c0 == other._c0 and \ + self._code == other._code and \ + self._args == other._args + + elif isinstance(other, AnsiOp): + return self._c0 == other or self._code == other + + else: + raise TypeError(f"Cannot compare AnsiInstruction and {type(other)}") + + def __neq__(self, other): + return not self.__eq__(other) def __str__(self): return f'' @@ -110,13 +184,32 @@ class AnsiParser(object): ESC, BEL, BS, HT, LF, FF, CR = CTRL def __init__(self, - generator: Generator[bytes, None, None]): + generator: Generator[bytes, None, None], + size: Tuple[int, int]=(0, 0), + pos: Tuple[int, int]=(0, 0)): """ Args: generator: A generator which yields byte stream + size: Initial screen size (width, height) + pos: Initial cursor position (x, y) """ self._g = generator self._buffer = b'' + self._width, self._height = size + self._x, self._y = pos + self._last_size = 0 + + @property + def buffer(self) -> bytes: + """Return contents of current buffering + """ + return self._buffer + + def _experimantal_warning(self, message: str): + logger.error(message) + logger.error("This feature is experimental and does not support some ANSI codes.\n" \ + "If you encounter this error, please create an issue here:\n" \ + "https://github.com/ptr-yudai/ptrlib/issues") def _decode_csi(self) -> Optional[AnsiInstruction]: """Decode a CSI sequence @@ -124,12 +217,19 @@ def _decode_csi(self) -> Optional[AnsiInstruction]: c0, code = AnsiOp.ESC, AnsiOp.CSI # Parse parameters - mode_set = 0 + mode_set, mode_q, mode_private = 0, 0, 0 cur = 2 args = [] - if cur < len(self._buffer) and self._buffer[cur] == ord('='): - mode_set = 1 + while cur < len(self._buffer) and self._buffer[cur] in [ord('='), ord('?'), ord('>')]: + if self._buffer[cur] == ord('='): + mode_set = 1 + elif self._buffer[cur] == ord('?'): + mode_q = 1 + elif self._buffer[cur] == ord('>'): # TODO: Is this correct? + mode_private = 1 + else: + raise NotImplementedError("BUG: Unreachable path") cur += 1 while True: @@ -138,6 +238,7 @@ def _decode_csi(self) -> Optional[AnsiInstruction]: cur += 1 if cur >= len(self._buffer): + self._last_size = len(self._buffer) return None # NOTE: Common implementation seems to skip successive delimiters @@ -235,7 +336,78 @@ def _decode_csi(self) -> Optional[AnsiInstruction]: code, default = AnsiOp.SDS, (0,) elif self._buffer[cur] == ord('^'): code, default = AnsiOp.SIMD, (0,) - + elif self._buffer[cur] == ord('`'): + code, default = AnsiOp.HPA, (1,) + elif self._buffer[cur] == ord('a'): + code, default = AnsiOp.HPR, (1,) + elif self._buffer[cur] == ord('b'): + code, default = AnsiOp.REP, (1,) + elif self._buffer[cur] == ord('c'): + # NOTE: This operation has a lot of meanings + code = [AnsiOp.DA, AnsiOp.HSC][mode_set] + default = [(0,), ()][mode_set] + elif self._buffer[cur] == ord('d'): + code, default = AnsiOp.VPA, (1,) + elif self._buffer[cur] == ord('e'): + code, default = AnsiOp.VPR, (1,) + elif self._buffer[cur] == ord('f'): + code, default = AnsiOp.HVP, (1, 1) + elif self._buffer[cur] == ord('g'): + # TODO: Support reset tabs + code = [AnsiOp.TBC, AnsiOp.PRC][mode_set] + default = [(0,), ()][mode_set] + elif self._buffer[cur] == ord('h'): + code, default = AnsiOp.SM, () + elif self._buffer[cur] == ord('i'): + code, default = AnsiOp.MC, () + elif self._buffer[cur] == ord('j'): + code, default = AnsiOp.HPB, (1,) + elif self._buffer[cur] == ord('k'): + code, default = AnsiOp.VPB, (1,) + elif self._buffer[cur] == ord('l'): + # TODO: Support insert line up + code = [AnsiOp.RM, AnsiOp.CHC][mode_set] + default = [(1,), ()][mode_set] + elif self._buffer[cur] == ord('m'): + # TODO: Support delete line down + code = [AnsiOp.SGR, AnsiOp.SSM][mode_set] + default = [(0,), ()][mode_set] + elif self._buffer[cur] == ord('n'): + code, default = AnsiOp.DSR, (0,) + elif self._buffer[cur] == ord('o'): + code, default = AnsiOp.DAQ, (0,) + elif self._buffer[cur] == ord('p'): + code, default = AnsiOp.DECSSL, () + elif self._buffer[cur] == ord('q'): + code, default = AnsiOp.DECLL, () + elif self._buffer[cur] == ord('r'): + # TODO: Support CSR and SUNSCRL + code = [AnsiOp.DECSTBM, AnsiOp.RSM][mode_set] + default = [(), ()][mode_set] + elif self._buffer[cur] == ord('s'): + code, default = AnsiOp.SCP, () + elif self._buffer[cur] == ord('t'): + code, default = AnsiOp.DECSLPP, () + elif self._buffer[cur] == ord('u'): + code, default = AnsiOp.RCP, () + elif self._buffer[cur] == ord('v'): + code, default = AnsiOp.DECSVTS, () + elif self._buffer[cur] == ord('w'): + code, default = AnsiOp.DECSHORP, () + elif self._buffer[cur] == ord('x'): + code, default = AnsiOp.DGRTC, () + elif self._buffer[cur] == ord('y'): + code, default = AnsiOp.DECTST, () + elif self._buffer[cur] == ord('z'): + # TODO: Support + code = [AnsiOp.SSW, AnsiOp.CAT][mode_set] + default = [(), ()][mode_set] + else: + self._experimantal_warning(f"CSI not implemented: {self._buffer[cur-2:cur+0x10]}") + raise NotImplementedError("Unknown CSI") + + if len(args) < len(default): + args = tuple(args + list(default[len(args):])) self._buffer = self._buffer[cur+1:] return AnsiInstruction(c0, code, args) @@ -243,76 +415,92 @@ def _decode_csi(self) -> Optional[AnsiInstruction]: def _decode_esc(self) -> Optional[AnsiInstruction]: """Decode an ESC sequence """ - if len(self._buffer) < 2: - return None - c0 = AnsiOp.ESC code = AnsiOp.UNKNOWN - if self._buffer[1] == ord('B'): - code = AnsiOp.BPH - elif self._buffer[1] == ord('C'): - code = AnsiOp.NBH - elif self._buffer[1] == ord('D'): - code = AnsiOp.IND - elif self._buffer[1] == ord('E'): - code = AnsiOp.NEL - elif self._buffer[1] == ord('F'): - code = AnsiOp.SSA - elif self._buffer[1] == ord('G'): - code = AnsiOp.ESA - elif self._buffer[1] == ord('H'): - code = AnsiOp.HTS - elif self._buffer[1] == ord('I'): - code = AnsiOp.HTJ - elif self._buffer[1] == ord('J'): - code = AnsiOp.VTS - elif self._buffer[1] == ord('K'): - code = AnsiOp.PLD - elif self._buffer[1] == ord('L'): - code = AnsiOp.PLU - elif self._buffer[1] == ord('M'): - code = AnsiOp.RI - elif self._buffer[1] == ord('N'): - code = AnsiOp.SS2 - elif self._buffer[1] == ord('O'): - code = AnsiOp.SS3 - elif self._buffer[1] == ord('P'): - code = AnsiOp.DCS - elif self._buffer[1] == ord('Q'): - code = AnsiOp.PU1 - elif self._buffer[1] == ord('R'): - code = AnsiOp.PU2 - elif self._buffer[1] == ord('S'): - code = AnsiOp.STS - elif self._buffer[1] == ord('T'): - code = AnsiOp.CCH - elif self._buffer[1] == ord('U'): - code = AnsiOp.MW - elif self._buffer[1] == ord('V'): - code = AnsiOp.SPA - elif self._buffer[1] == ord('W'): - code = AnsiOp.EPA - elif self._buffer[1] == ord('X'): - code = AnsiOp.SOS - elif self._buffer[1] == ord('Z'): - code = AnsiOp.SCI - elif self._buffer[1] == ord('['): - return self._decode_csi() - return AnsiInstruction(c0, code) + cur = 1 + if len(self._buffer) <= cur: + self._last_size = len(self._buffer) + return None - """ - elif self._buffer[1] == 0x5c: - code = AnsiOp.ST - elif self._buffer[1] == 0x5d: - code = AnsiOp.OSC - elif self._buffer[1] == 0x5e: - code = AnsiOp.PM - elif self._buffer[1] == 0x5f: - code = AnsiOp.APC - """ + if self._buffer[cur] == ord('['): + cur += 1 + if self._buffer[cur] == ord('B'): + code = AnsiOp.BPH + elif self._buffer[cur] == ord('C'): + code = AnsiOp.NBH + elif self._buffer[cur] == ord('D'): + code = AnsiOp.IND + elif self._buffer[cur] == ord('E'): + code = AnsiOp.NEL + elif self._buffer[cur] == ord('F'): + code = AnsiOp.SSA + elif self._buffer[cur] == ord('G'): + code = AnsiOp.ESA + elif self._buffer[cur] == ord('H'): + code = AnsiOp.HTS + elif self._buffer[cur] == ord('I'): + code = AnsiOp.HTJ + elif self._buffer[cur] == ord('J'): + code = AnsiOp.VTS + elif self._buffer[cur] == ord('K'): + code = AnsiOp.PLD + elif self._buffer[cur] == ord('L'): + code = AnsiOp.PLU + elif self._buffer[cur] == ord('M'): + code = AnsiOp.RI + elif self._buffer[cur] == ord('N'): + code = AnsiOp.SS2 + elif self._buffer[cur] == ord('O'): + code = AnsiOp.SS3 + elif self._buffer[cur] == ord('P'): + code = AnsiOp.DCS + elif self._buffer[cur] == ord('Q'): + code = AnsiOp.PU1 + elif self._buffer[cur] == ord('R'): + code = AnsiOp.PU2 + elif self._buffer[cur] == ord('S'): + code = AnsiOp.STS + elif self._buffer[cur] == ord('T'): + code = AnsiOp.CCH + elif self._buffer[cur] == ord('U'): + code = AnsiOp.MW + elif self._buffer[cur] == ord('V'): + code = AnsiOp.SPA + elif self._buffer[cur] == ord('W'): + code = AnsiOp.EPA + elif self._buffer[cur] == ord('X'): + code = AnsiOp.SOS + elif self._buffer[cur] == ord('Z'): + code = AnsiOp.SCI + else: + return self._decode_csi() + + elif self._buffer[cur] == ord('('): + cur += 1 + if len(self._buffer) <= cur: + self._last_size = len(self._buffer) + return None + + if self._buffer[cur] == ord('B'): + code = AnsiOp.SCS_B + elif self._buffer[cur] == ord('0'): + code = AnsiOp.SCS_0 + else: + self._experimantal_warning(f"ESC not implemented: {self._buffer[cur-2:cur+0x10]}") + raise NotImplementedError(f"Unknown ESC") + + elif self._buffer[cur] == ord('='): + code = AnsiOp.DECKPAM + + else: + self._experimantal_warning(f"ESC not implemented: {self._buffer[cur-2:cur+0x10]}") + raise NotImplementedError(f"Unknown ESC") + + self._buffer = self._buffer[cur+1:] + return AnsiInstruction(c0, code) - def parse_block(self) -> Optional[AnsiInstruction]: + def parse_block(self) -> Optional[Union[bytes, AnsiInstruction]]: """Parse a block of ANSI escape sequence Returns: @@ -321,12 +509,15 @@ def parse_block(self) -> Optional[AnsiInstruction]: Raises: StopIteration: No more data to receive """ - try: - self._buffer += next(self._g) - except StopIteration: - pass - while len(self._buffer) == 0: - self._buffer += next(self._g) + if len(self._buffer) <= self._last_size: + try: + self._buffer += next(self._g) + except StopIteration as e: + if len(self._buffer) == 0: + # All processed, end of input + raise e from None + + self._last_size = 0 # TODO: Support C1 control code if self._buffer[0] not in AnsiParser.CTRL: @@ -358,18 +549,149 @@ def parse_block(self) -> Optional[AnsiInstruction]: self._buffer = self._buffer[1:] return instr -if __name__ == '__main__': - def test(): - yield b"ABC\n\x1b[12;23H\x08\x1b[30" - yield b"m\x1b[47mHello" - - ansi = AnsiParser(test()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - print(ansi.parse_block()) - + def _update_screen_size(self, screen): + if len(screen) == 0: + return + self._width = max(map(lambda pos: pos[0], screen.keys())) + 1 + self._height = max(map(lambda pos: pos[1], screen.keys())) + 1 + + def _special_char(self, charset: AnsiOp, c: int): + if charset == AnsiOp.SCS_B: + return c + + elif charset == AnsiOp.SCS_0: + if 0x5f <= c <= 0x7e: + return [0x20, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x3f, 0x6f, + 0x2b, 0x3f, 0x3f, 0x2b, 0x2b, 0x2b, 0x2b, 0x2b, + 0x2d, 0x2d, 0x2d, 0x2d, 0x2d, 0x2b, 0x2b, 0x2b, + 0x2b, 0x7c, 0x3c, 0x3e, 0x6e, 0x3d, 0x66, 0x2e][c - 0x5f] + else: + return c + + else: + self._experimantal_warning(f"Character set not implemented: {charset}") + raise NotImplementedError("Unknown character set") + + def draw_screen(self, + returns: type=list, + stop: Optional[Callable[[AnsiInstruction], bool]]=None) -> list: + """Receive a screen + + Args: + returns: Either str or list + stop: Function to determine when to stop emulating instructions + """ + if stop is None: + # Default stop checker designed for ncurses games + stop = lambda instr: instr == AnsiOp.HTS + + # NOTE: These variables are global so that we can support + # successive draws in the future + self._width = self._height = 0 + + screen = {} + charset = AnsiOp.SCS_B + DEL = 0x20 # Empty + last_char = DEL + stop_recv = False + while not stop_recv: + instr = None + try: + while instr is None: + instr = self.parse_block() + except StopIteration: + break + + stop_recv = stop(instr) + + if isinstance(instr, bytes): + # TODO: Reverse order? + for c in instr: + screen[(self._x, self._y)] = self._special_char(charset, c) + self._x += 1 + last_char = c + + else: + if instr.is_skip: + continue + + elif instr == AnsiOp.SCS_B: # English mode + charset = AnsiOp.SCS_B + + elif instr == AnsiOp.BS: # Back space + self._x = max(0, self._x - 1) + stop_recv = True + + elif instr == AnsiOp.CHA: # Cursor character absolute + self._x = instr[0] - 1 + + elif instr == AnsiOp.SCS_0: # DEC special graphic + charset = AnsiOp.SCS_0 + + elif instr == AnsiOp.CR: # Carriage return + self._x, self._y = 0, self._y + 1 + + elif instr == AnsiOp.CUP: # Cursor position + self._x, self._y = instr[1] - 1, instr[0] - 1 + + elif instr == AnsiOp.ECH: # Erase character + for x in range(self._x, self._x + instr[0]): + screen[(x, self._y)] = DEL + + elif instr == AnsiOp.ED: # Erase in page + self._update_screen_size(screen) + if instr[0] == 0: + for y in range(self._y, self._height): + screen[(self._x, y)] = DEL + elif instr[0] == 1: + for y in range(self._y + 1): + screen[(self._x, y)] = DEL + elif instr[0] == 2: + for y in range(self._height): + screen[(self._x, y)] = DEL + + elif instr == AnsiOp.EL: # Erase in line + self._update_screen_size(screen) + if instr[0] == 0: + for x in range(self._x, self._width): + screen[(x, self._y)] = DEL + elif instr[0] == 1: + for x in range(self._x + 1): + screen[(x, self._y)] = DEL + elif instr[0] == 2: + for x in range(self._width): + screen[(x, self._y)] = DEL + + elif instr == AnsiOp.HTS: + self._x, self._y = 0, 0 + + elif instr == AnsiOp.LF: + self._x, self._y = 0, self._y + 1 + + elif instr == AnsiOp.REP: # Repeat + for x in range(self._x, self._x + instr[0]): + screen[(x, self._y)] = self._special_char(charset, last_char) + self._x += instr[0] + + elif instr == AnsiOp.RM: # Reset mode + pass # TODO: ? + + elif instr == AnsiOp.SM: # Set mode + pass # TODO: ? + + elif instr == AnsiOp.VPA: # Line position absolute + self._y = instr[0] - 1 + + else: + raise ValueError(f"Emulation not supported for instruction {instr}") + + self._update_screen_size(screen) + field = [[' ' for x in range(self._width)] + for y in range(self._height)] + for (x, y) in screen: + field[y][x] = chr(screen[(x, y)]) + + if returns == list: + return field + else: + return '\n'.join(map(lambda line: ''.join(line), field)) diff --git a/ptrlib/binary/encoding/locale.py b/ptrlib/binary/encoding/char.py similarity index 100% rename from ptrlib/binary/encoding/locale.py rename to ptrlib/binary/encoding/char.py diff --git a/ptrlib/connection/tube.py b/ptrlib/connection/tube.py index f64ad7e..36804f9 100644 --- a/ptrlib/connection/tube.py +++ b/ptrlib/connection/tube.py @@ -3,8 +3,8 @@ import sys import threading from logging import getLogger -from typing import List, Literal, Optional, Tuple, Union -from ptrlib.binary.encoding import bytes2str, str2bytes, bytes2hex, bytes2utf8, hexdump, draw_ansi +from typing import Callable, List, Literal, Optional, Tuple, Union +from ptrlib.binary.encoding import bytes2str, str2bytes, bytes2hex, bytes2utf8, hexdump, AnsiParser, AnsiInstruction from ptrlib.console.color import Color logger = getLogger(__name__) @@ -404,52 +404,38 @@ def recvregex(self, return match.group() def recvscreen(self, - delim: Optional[Union[str, bytes]]=b'\x1b[H', - returns: Optional[type]=str, - prev: Optional[Union[str, bytes, list]]=None, - timeout: Optional[Union[int, float]]=None): + returns: type=str, + stop: Optional[Callable[[AnsiInstruction], bool]]=None, + timeout: Union[int, float]=1.0): """Receive a screen Receive a screen drawn by ncurses (ANSI escape sequence) Args: - delim : Refresh sequence - returns : Return value as string or list - prev : Previous screen (Use when screen is partially updated) - timeout : Timeout until receiving the delimiter + returns: Either str or list + stop: Function to determine when to stop emulating instructions + timeout: Timeout until stopping recv Returns: str: Rectangle string drawing the screen - - Raises: - ConnectionAbortedError: Connection is aborted by process - ConnectionResetError: Connection is closed by peer - TimeoutError: Timeout exceeded - OSError: System error """ assert returns in [list, str, bytes], \ - "`returns` must be either list, str, or bytes" - assert prev is None or isinstance(prev, (str, bytes, list)), \ - "`prev` must be either list, str, or bytes" - - try: - self.recvuntil(delim, timeout=timeout) - except TimeoutError as err: - # NOTE: We do not set received value here - raise TimeoutError("Timeout (recvscreen)", b'') + "`returns` must be either list or str" - try: - buf = self.recvuntil(delim, drop=True, lookahead=True, timeout=timeout2) - except TimeoutError as err: - buf = err.args[1] + def _ansi_stream(self): + """Generator for recvscreen + """ + while True: + try: + yield self.recv(timeout=timeout) + except TimeoutError as e: + self.unget(e.args[1]) + break - screen = draw_ansi(buf) - if returns == str: - return '\n'.join(map(lambda row: ''.join(row), screen)) - elif returns == bytes: - return b'\n'.join(map(lambda row: bytes(row), screen)) - else: - return screen + ansi = AnsiParser(_ansi_stream(self)) + scr = ansi.draw_screen(returns, stop) + self.unget(ansi.buffer) + return scr def send(self, data: Union[str, bytes]) -> int: """Send raw data @@ -669,7 +655,7 @@ def thread_recv(flag: threading.Event): flag.set() except TimeoutError: - pass + pass # NOTE: We can ignore args since recv will never buffer except EOFError: logger.error("Receiver EOF") break