From e3d3d92bb06595f1a669f97be72caa906fe50cf5 Mon Sep 17 00:00:00 2001 From: Blueve <672454911@qq.com> Date: Wed, 4 Nov 2020 14:32:16 +0800 Subject: [PATCH] [consutil] Store console port state in STATE_DB (#1208) * [consutil] Store console port state in STATE_DB * Fix LGTM issues * Update a naming Signed-off-by: Jing Kan jika@microsoft.com --- consutil/lib.py | 425 +++++++++++++++++++++++++++++++----------- consutil/main.py | 122 +++++------- tests/console_test.py | 249 +++++++++++++++++++++++++ 3 files changed, 618 insertions(+), 178 deletions(-) diff --git a/consutil/lib.py b/consutil/lib.py index f80d1d6cab08..2afdac5f4825 100644 --- a/consutil/lib.py +++ b/consutil/lib.py @@ -9,127 +9,338 @@ import click import re import subprocess + import pexpect import sys import os - from swsssdk import ConfigDBConnector from sonic_py_common import device_info except ImportError as e: raise ImportError("%s - required module not found" % str(e)) -DEVICE_PREFIX = "/dev/ttyUSB" - -ERR_CMD = 1 -ERR_DEV = 2 -ERR_CFG = 3 +ERR_DISABLE = 1 +ERR_CMD = 2 +ERR_DEV = 3 +ERR_CFG = 4 +ERR_BUSY = 5 CONSOLE_PORT_TABLE = "CONSOLE_PORT" LINE_KEY = "LINE" +CUR_STATE_KEY = "CUR_STATE" + +# CONFIG_DB Keys BAUD_KEY = "baud_rate" DEVICE_KEY = "remote_device" FLOW_KEY = "flow_control" -DEFAULT_BAUD = "9600" + +# STATE_DB Keys +STATE_KEY = "state" +PID_KEY = "pid" +START_TIME_KEY = "state_time" + +BUSY_FLAG = "busy" +IDLE_FLAG = "idle" + +# picocom Constants +PICOCOM_READY = "Terminal ready" +PICOCOM_BUSY = "Resource temporarily unavailable" FILENAME = "udevprefix.conf" -# QUIET == True => picocom will not output any messages, and pexpect will wait for console -# switch login or command line to let user interact with shell -# Downside: if console switch output ever does not match DEV_READY_MSG, program will think connection failed -# QUIET == False => picocom will output messages - welcome message is caught by pexpect, so successful -# connection will always lead to user interacting with shell -# Downside: at end of session, picocom will print exit message, exposing picocom to user -QUIET = False -DEV_READY_MSG = r"([Ll]ogin:|[Pp]assword:|[$>#])" # login prompt or command line prompt TIMEOUT_SEC = 0.2 -platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() -PLUGIN_PATH = "/".join([platform_path, "plugins", FILENAME]) - -if os.path.exists(PLUGIN_PATH): - fp = open(PLUGIN_PATH, 'r') - line = fp.readlines() - DEVICE_PREFIX = "/dev/" + line[0] - -# runs command, exit if stderr is written to and abort argument is ture, returns stdout, stderr otherwise -# input: cmd (str, bool), output: output of cmd (str) and error of cmd (str) if abort is not true -def run_command(cmd, abort=True): - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - output = proc.stdout.read() - error = proc.stderr.read() - if abort and error != "": - click.echo("Command resulted in error: {}".format(error)) - sys.exit(ERR_CMD) - return output if abort else (output, error) - -# returns a list of all lines -def getAllLines(brief=False): - config_db = ConfigDBConnector() - config_db.connect() - - # Querying CONFIG_DB to get configured console ports - keys = config_db.get_keys(CONSOLE_PORT_TABLE) - lines = [] - for k in keys: - line = config_db.get_entry(CONSOLE_PORT_TABLE, k) - line[LINE_KEY] = k - lines.append(line) - - # Querying device directory to get all available console ports - if not brief: - cmd = "ls " + DEVICE_PREFIX + "*" - output, _ = run_command(cmd, abort=False) - availableTtys = output.split('\n') - availableTtys = list(filter(lambda dev: re.match(DEVICE_PREFIX + r"\d+", dev) != None, availableTtys)) - for tty in availableTtys: - k = tty[len(DEVICE_PREFIX):] - if k not in keys: - line = { LINE_KEY: k } - lines.append(line) - return lines - -# returns a dictionary of busy lines and their info -# maps line number to (pid, process start time) -def getBusyLines(): - cmd = 'ps -eo pid,lstart,cmd | grep -E "(mini|pico)com"' - output = run_command(cmd) - processes = output.split('\n') - - # matches any number of spaces then any number of digits - regexPid = r" *(\d+)" - # matches anything of form: Xxx Xxx ( 0)or(00) 00:00:00 0000 - regexDate = r"([A-Z][a-z]{2} [A-Z][a-z]{2} [\d ]\d \d{2}:\d{2}:\d{2} \d{4})" - # matches any non-whitespace characters ending in minicom or picocom, - # then a space and any chars followed by /dev/ttyUSB, - # then a space and any chars - regexCmd = r"\S*(?:(?:mini)|(?:pico))com .*" + DEVICE_PREFIX + r"(\d+)(?: .*)?" - regexProcess = re.compile(r"^"+regexPid+r" "+regexDate+r" "+regexCmd+r"$") - - busyLines = {} - for process in processes: - match = regexProcess.match(process) - if match != None: - pid = match.group(1) - date = match.group(2) - linenum_key = match.group(3) - busyLines[linenum_key] = (pid, date) - return busyLines - -# returns the target device corresponding to target, or None if line number connot be found -# if deviceBool, interprets target as device name -# otherwise interprets target as line number -# input: target (str), deviceBool (bool), output: device (dict) -def getLine(target, deviceBool=False): - lines = getAllLines() - - # figure out the search key - searchKey = LINE_KEY - if deviceBool: - searchKey = DEVICE_KEY - - # identify the line number by searching configuration - lineNumber = None - for line in lines: - if searchKey in line and line[searchKey] == target: - lineNumber = line[LINE_KEY] - targetLine = line - - return targetLine if lineNumber else None +class ConsolePortProvider(object): + """ + The console ports' provider. + The provider can let user to get console ports information. + """ + + def __init__(self, db, configured_only): + self._db = db + self._configured_only = configured_only + self._ports = [] + self._init_all() + + def get_all(self): + """Gets all console ports information""" + for port in self._ports: + yield ConsolePortInfo(self._db, port) + + def get(self, target, use_device=False): + """Gets information of a ports, the target is the line number by default""" + # figure out the search key + search_key = LINE_KEY + if use_device: + search_key = DEVICE_KEY + + # identify the line number by searching configuration + for port in self._ports: + if search_key in port and port[search_key] == target: + return ConsolePortInfo(self._db, port) + + raise LineNotFoundError + + def _init_all(self): + config_db = self._db.cfgdb + state_db = self._db.db + + # Querying CONFIG_DB to get configured console ports + keys = config_db.get_keys(CONSOLE_PORT_TABLE) + ports = [] + for k in keys: + port = config_db.get_entry(CONSOLE_PORT_TABLE, k) + port[LINE_KEY] = k + port[CUR_STATE_KEY] = state_db.get_all(state_db.STATE_DB, "{}|{}".format(CONSOLE_PORT_TABLE, k)) + ports.append(port) + + # Querying device directory to get all available console ports + if not self._configured_only: + available_ttys = SysInfoProvider.list_console_ttys() + for tty in available_ttys: + k = tty[len(SysInfoProvider.DEVICE_PREFIX):] + if k not in keys: + port = { LINE_KEY: k } + ports.append(port) + self._ports = ports + +class ConsolePortInfo(object): + def __init__(self, db, info): + self._db = db + self._info = info + self._session = None + + def __str__(self): + return "({}, {}, {})".format(self.line_num, self.baud, self.remote_device) + + @property + def line_num(self): + return self._info[LINE_KEY] + + @property + def baud(self): + return self._info[BAUD_KEY] if BAUD_KEY in self._info else None + + @property + def flow_control(self): + return FLOW_KEY in self._info and self._info[FLOW_KEY] == "1" + + @property + def remote_device(self): + return self._info[DEVICE_KEY] if DEVICE_KEY in self._info else None + + @property + def busy(self): + return STATE_KEY in self.cur_state and self.cur_state[STATE_KEY] == BUSY_FLAG + + @property + def session_pid(self): + return self.cur_state[PID_KEY] if PID_KEY in self.cur_state else None + + @property + def session_start_date(self): + return self.cur_state[START_TIME_KEY] if START_TIME_KEY in self.cur_state else None + + @property + def cur_state(self): + if CUR_STATE_KEY not in self._info or self._info[CUR_STATE_KEY] is None: + self._info[CUR_STATE_KEY] = {} + return self._info[CUR_STATE_KEY] + + def connect(self): + """Connect to current line""" + self.refresh() + + # check if line is busy + if self.busy: + raise LineBusyError + + # check required configuration + if self.baud is None: + raise InvalidConfigurationError("baud", "line [{}] has no baud rate".format(self.line_num)) + + # build and start picocom command + flow_cmd = "h" if self.flow_control else "n" + cmd = "sudo picocom -b {} -f {} {}{}".format(self.baud, flow_cmd, SysInfoProvider.DEVICE_PREFIX, self.line_num) + + # start connection + try: + proc = pexpect.spawn(cmd) + proc.send("\n") + self._session = ConsoleSession(self, proc) + finally: + self.refresh() + + # check if connection succeed + index = proc.expect([PICOCOM_READY, PICOCOM_BUSY, pexpect.EOF, pexpect.TIMEOUT], timeout=TIMEOUT_SEC) + if index == 0: + return self._session + elif index == 1: + self._session = None + raise LineBusyError + else: + self._session = None + raise ConnectionFailedError + + def clear_session(self): + """Clear existing session on current line, returns True if the line has been clear""" + self.refresh() + if not self.busy: + return False + + try: + if not self._session: + pid = self.session_pid + cmd = "sudo kill -SIGTERM " + pid + SysInfoProvider.run_command(cmd) + else: + self._session.close() + finally: + self.refresh() + self._session = None + + return True + + def refresh(self): + """Refresh state for current console port""" + if self._session is not None: + proc_info = SysInfoProvider.get_active_console_process_info(self._session.proc.pid) + if proc_info is not None: + line_num, pid, date = proc_info + if line_num != self.line_num: + # line mismatch which means the session is stale and shouldn't be use anymore + self._update_state(BUSY_FLAG, pid, date, line_num) + self._update_state(IDLE_FLAG, "", "") + raise ConnectionFailedError + else: + self._update_state(BUSY_FLAG, pid, date) + else: + self._update_state(IDLE_FLAG, "", "") + else: + # refresh all active ports' state because we already got newest state for all ports + busy_lines = SysInfoProvider.list_active_console_processes() + for line_num, proc_info in busy_lines.items(): + pid, date = proc_info + self._update_state(BUSY_FLAG, pid, date, line_num) + if self.line_num not in busy_lines: + self._update_state(IDLE_FLAG, "", "") + + def _update_state(self, state, pid, date, line_num=None): + state_db = self._db.db + line_key = "{}|{}".format(CONSOLE_PORT_TABLE, self.line_num if line_num is None else line_num) + state_db.set(state_db.STATE_DB, line_key, STATE_KEY, state) + state_db.set(state_db.STATE_DB, line_key, PID_KEY, pid) + state_db.set(state_db.STATE_DB, line_key, START_TIME_KEY, date) + self._info[CUR_STATE_KEY] = {} if CUR_STATE_KEY not in self._info else self._info[CUR_STATE_KEY] + self._info[CUR_STATE_KEY][STATE_KEY] = state + self._info[CUR_STATE_KEY][PID_KEY] = pid + self._info[CUR_STATE_KEY][START_TIME_KEY] = date + +class ConsoleSession(object): + """ + The Console connection session. + """ + + def __init__(self, port, proc): + self.port = port + self.proc = proc + + def interact(self): + """Interact with picocom""" + try: + self.proc.interact() + finally: + self.port.refresh() + + def close(self): + """Close picocom session""" + self.proc.close(force=True) + +class SysInfoProvider(object): + """ + The system level information provider. + """ + DEVICE_PREFIX = "/dev/ttyUSB" + + @staticmethod + def init_device_prefix(): + platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() + PLUGIN_PATH = "/".join([platform_path, "plugins", FILENAME]) + + if os.path.exists(PLUGIN_PATH): + fp = open(PLUGIN_PATH, 'r') + line = fp.readlines() + SysInfoProvider.DEVICE_PREFIX = "/dev/" + line[0] + + @staticmethod + def list_console_ttys(): + """Lists all console tty devices""" + cmd = "ls " + SysInfoProvider.DEVICE_PREFIX + "*" + output, _ = SysInfoProvider.run_command(cmd, abort=False) + ttys = output.split('\n') + ttys = list(filter(lambda dev: re.match(SysInfoProvider.DEVICE_PREFIX + r"\d+", dev) != None, ttys)) + return ttys + + @staticmethod + def list_active_console_processes(): + """Lists all active console session processes""" + cmd = 'ps -eo pid,lstart,cmd | grep -E "(mini|pico)com"' + output = SysInfoProvider.run_command(cmd) + return SysInfoProvider._parse_processes_info(output) + + @staticmethod + def get_active_console_process_info(pid): + """Gets active console process information by PID""" + cmd = 'ps -p {} -o pid,lstart,cmd | grep -E "(mini|pico)com"'.format(pid) + output = SysInfoProvider.run_command(cmd) + processes = SysInfoProvider._parse_processes_info(output) + if len(processes.keys()) == 1: + return (processes.keys()[0],) + list(processes.values())[0] + else: + return None + + @staticmethod + def _parse_processes_info(output): + processes = output.split('\n') + + # matches any number of spaces then any number of digits + regex_pid = r" *(\d+)" + # matches anything of form: Xxx Xxx ( 0)or(00) 00:00:00 0000 + regex_date = r"([A-Z][a-z]{2} [A-Z][a-z]{2} [\d ]\d \d{2}:\d{2}:\d{2} \d{4})" + # matches any characters ending in minicom or picocom, + # then a space and any chars followed by /dev/ttyUSB, + # then a space and any chars + regex_cmd = r".*(?:(?:mini)|(?:pico))com .*" + SysInfoProvider.DEVICE_PREFIX + r"(\d+)(?: .*)?" + regex_process = re.compile(r"^" + regex_pid + r" " + regex_date + r" " + regex_cmd + r"$") + + console_processes = {} + for process in processes: + match = regex_process.match(process) + if match != None: + pid = match.group(1) + date = match.group(2) + line_num = match.group(3) + console_processes[line_num] = (pid, date) + return console_processes + + @staticmethod + def run_command(cmd, abort=True): + """runs command, exit if stderr is written to and abort argument is ture, returns stdout, stderr otherwise""" + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + output = proc.stdout.read() + error = proc.stderr.read() + if abort and error != "": + click.echo("Command resulted in error: {}".format(error)) + sys.exit(ERR_CMD) + return output if abort else (output, error) + +class InvalidConfigurationError(Exception): + def __init__(self, config_key, message): + self.config_key = config_key + self.message = message + +class LineBusyError(Exception): + pass + +class LineNotFoundError(Exception): + pass + +class ConnectionFailedError(Exception): + pass diff --git a/consutil/main.py b/consutil/main.py index 089330c92582..04753e00e08a 100644 --- a/consutil/main.py +++ b/consutil/main.py @@ -8,8 +8,9 @@ try: import click import os - import pexpect import sys + import utilities_common.cli as clicommon + from tabulate import tabulate from lib import * except ImportError as e: @@ -18,107 +19,86 @@ @click.group() def consutil(): """consutil - Command-line utility for interacting with switches via console device""" - if os.geteuid() != 0: click.echo("Root privileges are required for this operation") sys.exit(ERR_CMD) + SysInfoProvider.init_device_prefix() # 'show' subcommand @consutil.command() +@clicommon.pass_db @click.option('--brief', '-b', metavar='', required=False, is_flag=True) -def show(brief): - """Show all lines and their info include available ttyUSB devices unless specified brief mode""" - lines = getAllLines(brief) - busyLines = getBusyLines() +def show(db, brief): + """Show all ports and their info include available ttyUSB devices unless specified brief mode""" + port_provider = ConsolePortProvider(db, brief) + ports = list(port_provider.get_all()) - # sort lines for table rendering - lines.sort(key=lambda dev: int(dev[LINE_KEY])) + # sort ports for table rendering + ports.sort(key=lambda p: int(p.line_num)) # set table header style - header = ["Line", "Baud", "PID", "Start Time", "Device"] + header = ["Line", "Baud", "PID", "Start Time", "Device"] body = [] - for line in lines: - # configured information - lineNum = line[LINE_KEY] - baud = '-' if BAUD_KEY not in line else line[BAUD_KEY] - remoteDevice = '-' if DEVICE_KEY not in line else line[DEVICE_KEY] - + for port in ports: # runtime information - busy = " " - pid = "" - date = "" - if lineNum in busyLines: - pid, date = busyLines[lineNum] - busy = "*" - body.append([busy+lineNum, baud, pid, date, remoteDevice]) - click.echo(tabulate(body, header, stralign='right')) + busy = "*" if port.busy else " " + pid = port.session_pid if port.session_pid else "-" + date = port.session_start_date if port.session_start_date else "-" + baud = port.baud + body.append([busy+port.line_num, baud if baud else "-", pid if pid else "-", date if date else "-", port.remote_device]) + click.echo(tabulate(body, header, stralign='right')) # 'clear' subcommand @consutil.command() +@clicommon.pass_db @click.argument('target') -def clear(target): +@click.option('--devicename', '-d', is_flag=True, help="clear by name - if flag is set, interpret linenum as device name instead") +def clear(db, target, devicename): """Clear preexisting connection to line""" - targetLine = getLine(target) - if not targetLine: + # identify the target line + port_provider = ConsolePortProvider(db, configured_only=False) + try: + target_port = port_provider.get(target, use_device=devicename) + except LineNotFoundError: click.echo("Target [{}] does not exist".format(target)) sys.exit(ERR_DEV) - lineNumber = targetLine[LINE_KEY] - busyLines = getBusyLines() - if lineNumber in busyLines: - pid, _ = busyLines[lineNumber] - cmd = "sudo kill -SIGTERM " + pid - click.echo("Sending SIGTERM to process " + pid) - run_command(cmd) - else: - click.echo("No process is connected to line " + lineNumber) + if not target_port.clear_session(): + click.echo("No process is connected to line " + target_port.line_num) # 'connect' subcommand @consutil.command() +@clicommon.pass_db @click.argument('target') @click.option('--devicename', '-d', is_flag=True, help="connect by name - if flag is set, interpret linenum as device name instead") -def connect(target, devicename): +def connect(db, target, devicename): """Connect to switch via console device - TARGET is line number or device name of switch""" # identify the target line - targetLine = getLine(target, devicename) - if not targetLine: + port_provider = ConsolePortProvider(db, configured_only=False) + try: + target_port = port_provider.get(target, use_device=devicename) + except LineNotFoundError: click.echo("Cannot connect: target [{}] does not exist".format(target)) sys.exit(ERR_DEV) - lineNumber = targetLine[LINE_KEY] - # build and start picocom command - if BAUD_KEY in targetLine: - baud = targetLine[BAUD_KEY] - else: - click.echo("Cannot connect: line [{}] has no baud rate".format(lineNumber)) - sys.exit(ERR_CFG) - flowBool = True if FLOW_KEY in targetLine and targetLine[FLOW_KEY] == "1" else False - flowCmd = "h" if flowBool else "n" - quietCmd = "-q" if QUIET else "" - cmd = "sudo picocom -b {} -f {} {} {}{}".format(baud, flowCmd, quietCmd, DEVICE_PREFIX, lineNumber) - proc = pexpect.spawn(cmd) - proc.send("\n") + line_num = target_port.line_num - if QUIET: - readyMsg = DEV_READY_MSG - else: - readyMsg = "Terminal ready" # picocom ready message - busyMsg = "Resource temporarily unavailable" # picocom busy message - - # interact with picocom or print error message, depending on pexpect output - index = proc.expect([readyMsg, busyMsg, pexpect.EOF, pexpect.TIMEOUT], timeout=TIMEOUT_SEC) - if index == 0: # terminal ready - click.echo("Successful connection to line {}\nPress ^A ^X to disconnect".format(lineNumber)) - if QUIET: - # prints picocom output up to and including readyMsg - click.echo(proc.before + proc.match.group(0), nl=False) - proc.interact() - if QUIET: - click.echo("\nTerminating...") - elif index == 1: # resource is busy - click.echo("Cannot connect: line {} is busy".format(lineNumber)) - else: # process reached EOF or timed out + # connect + try: + session = target_port.connect() + except LineBusyError: + click.echo("Cannot connect: line {} is busy".format(line_num)) + sys.exit(ERR_BUSY) + except InvalidConfigurationError as cfg_err: + click.echo("Cannot connect: {}".format(cfg_err.message)) + sys.exit(ERR_CFG) + except ConnectionFailedError: click.echo("Cannot connect: unable to open picocom process") + sys.exit(ERR_DEV) + + # interact + click.echo("Successful connection to line {}\nPress ^A ^X to disconnect".format(line_num)) + session.interact() if __name__ == '__main__': - consutil() \ No newline at end of file + consutil() diff --git a/tests/console_test.py b/tests/console_test.py index e8e088c6dd35..30093ce37f93 100644 --- a/tests/console_test.py +++ b/tests/console_test.py @@ -1,5 +1,9 @@ import os import sys +import subprocess +import pexpect + +import mock import pytest import config.main as config @@ -7,6 +11,8 @@ from click.testing import CliRunner from utilities_common.db import Db +from consutil.lib import * +from sonic_py_common import device_info class TestConfigConsoleCommands(object): @classmethod @@ -214,3 +220,246 @@ def test_update_console_flow_control_success(self): print(result.exit_code) print(sys.stderr, result.output) assert result.exit_code == 0 + +class TestConsutilLib(object): + @classmethod + def setup_class(cls): + print("SETUP") + + def test_console_port_provider_get_all_configured_only_empty(self): + db = Db() + provider = ConsolePortProvider(db, configured_only=True) + assert len(list(provider.get_all())) == 0 + + def test_console_port_provider_get_all_configured_only_nonempty(self): + db = Db() + db.cfgdb.set_entry("CONSOLE_PORT", "1", { "baud_rate" : "9600" }) + + provider = ConsolePortProvider(db, configured_only=True) + assert len(list(provider.get_all())) == 1 + + @mock.patch('consutil.lib.SysInfoProvider.list_console_ttys', mock.MagicMock(return_value=["/dev/ttyUSB0", "/dev/ttyUSB1"])) + def test_console_port_provider_get_all_with_ttys(self): + db = Db() + db.cfgdb.set_entry("CONSOLE_PORT", "1", { "baud_rate" : "9600" }) + + provider = ConsolePortProvider(db, configured_only=False) + ports = list(provider.get_all()) + print('[{}]'.format(', '.join(map(str, ports)))) + assert len(ports) == 2 + + def test_console_port_provider_get_line_success(self): + db = Db() + db.cfgdb.set_entry("CONSOLE_PORT", "1", { "baud_rate" : "9600" }) + + provider = ConsolePortProvider(db, configured_only=True) + port = provider.get("1") + assert port is not None + assert port.line_num == "1" + + def test_console_port_provider_get_line_not_found(self): + with pytest.raises(LineNotFoundError): + db = Db() + provider = ConsolePortProvider(db, configured_only=True) + provider.get("1") + + def test_console_port_provider_get_line_by_device_success(self): + db = Db() + db.cfgdb.set_entry("CONSOLE_PORT", 2, { "remote_device" : "switch2" }) + + provider = ConsolePortProvider(db, configured_only=True) + port = provider.get("switch2", use_device=True) + assert port is not None + assert port.line_num == "2" + + def test_console_port_provider_get_line_by_device_not_found(self): + with pytest.raises(LineNotFoundError): + db = Db() + db.cfgdb.set_entry("CONSOLE_PORT", 2, { "remote_device" : "switch2" }) + + provider = ConsolePortProvider(db, configured_only=True) + provider.get("switch1") + + @mock.patch('consutil.lib.SysInfoProvider.list_active_console_processes', mock.MagicMock(return_value={ "1" : ("223", "2020/11/2")})) + def test_console_port_info_refresh_without_session(self): + db = Db() + + port = ConsolePortInfo(db, { "LINE" : "1" }) + port.refresh() + assert port.busy + assert port.session_pid == "223" + assert port.session_start_date == "2020/11/2" + + @mock.patch('consutil.lib.SysInfoProvider.list_active_console_processes', mock.MagicMock(return_value={ "2" : ("223", "2020/11/2")})) + def test_console_port_info_refresh_without_session_idle(self): + db = Db() + + port = ConsolePortInfo(db, { "LINE" : "1" }) + port.refresh() + assert port.busy == False + + @mock.patch('consutil.lib.SysInfoProvider.get_active_console_process_info', mock.MagicMock(return_value=("1", "223", "2020/11/2"))) + def test_console_port_info_refresh_with_session(self): + db = Db() + + port = ConsolePortInfo(db, { "LINE" : "1" }) + port._session = ConsoleSession(port, mock.MagicMock(pid="223")) + print(port) + + port.refresh() + assert port.busy == True + assert port.session_pid == "223" + assert port.session_start_date == "2020/11/2" + + @mock.patch('consutil.lib.SysInfoProvider.get_active_console_process_info', mock.MagicMock(return_value=("2", "223", "2020/11/2"))) + def test_console_port_info_refresh_with_session_line_mismatch(self): + db = Db() + + port = ConsolePortInfo(db, { "LINE" : "1" }) + port._session = ConsoleSession(port, mock.MagicMock(pid="223")) + print(port) + + with pytest.raises(ConnectionFailedError): + port.refresh() + + assert port.busy == False + + @mock.patch('consutil.lib.SysInfoProvider.get_active_console_process_info', mock.MagicMock(return_value=None)) + def test_console_port_info_refresh_with_session_process_ended(self): + db = Db() + + port = ConsolePortInfo(db, { "LINE" : "1" }) + port._session = ConsoleSession(port, mock.MagicMock(pid="223")) + print(port) + + port.refresh() + assert port.busy == False + + def test_console_port_info_connect_state_busy(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "CUR_STATE" : { "state" : "busy" } }) + + port.refresh = mock.MagicMock(return_value=None) + with pytest.raises(LineBusyError): + port.connect() + + def test_console_port_info_connect_invalid_config(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "CUR_STATE" : { "state" : "idle" } }) + + port.refresh = mock.MagicMock(return_value=None) + with pytest.raises(InvalidConfigurationError): + port.connect() + + def test_console_port_info_connect_device_busy(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "idle" } }) + + port.refresh = mock.MagicMock(return_value=None) + mock_proc = mock.MagicMock(spec=subprocess.Popen) + mock_proc.send = mock.MagicMock(return_value=None) + mock_proc.expect = mock.MagicMock(return_value=1) + with mock.patch('pexpect.spawn', mock.MagicMock(return_value=mock_proc)): + with pytest.raises(LineBusyError): + port.connect() + + def test_console_port_info_connect_connection_fail(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "idle" } }) + + port.refresh = mock.MagicMock(return_value=None) + mock_proc = mock.MagicMock(spec=subprocess.Popen) + mock_proc.send = mock.MagicMock(return_value=None) + mock_proc.expect = mock.MagicMock(return_value=2) + with mock.patch('pexpect.spawn', mock.MagicMock(return_value=mock_proc)): + with pytest.raises(ConnectionFailedError): + port.connect() + + def test_console_port_info_connect_success(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "idle" } }) + + port.refresh = mock.MagicMock(return_value=None) + mock_proc = mock.MagicMock(spec=subprocess.Popen, pid="223") + mock_proc.send = mock.MagicMock(return_value=None) + mock_proc.expect = mock.MagicMock(return_value=0) + with mock.patch('pexpect.spawn', mock.MagicMock(return_value=mock_proc)): + session = port.connect() + assert session.proc.pid == "223" + assert session.port.line_num == "1" + + def test_console_port_info_clear_session_line_not_busy(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "idle" } }) + + port.refresh = mock.MagicMock(return_value=None) + assert not port.clear_session() + + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=None)) + def test_console_port_info_clear_session_with_state_db(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "busy", "pid" : "223" } }) + + port.refresh = mock.MagicMock(return_value=None) + assert port.clear_session() + + def test_console_port_info_clear_session_with_existing_session(self): + db = Db() + port = ConsolePortInfo(db, { "LINE" : "1", "baud_rate" : "9600", "CUR_STATE" : { "state" : "busy" } }) + port._session = ConsoleSession(port, None) + port._session.close = mock.MagicMock(return_value=None) + port.refresh = mock.MagicMock(return_value=None) + assert port.clear_session() + + @mock.patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', mock.MagicMock(return_value=("dummy_path", None))) + @mock.patch('os.path.exists', mock.MagicMock(return_value=False)) + def test_sys_info_provider_init_device_prefix_plugin_nonexists(self): + SysInfoProvider.init_device_prefix() + assert SysInfoProvider.DEVICE_PREFIX == "/dev/ttyUSB" + + @mock.patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', mock.MagicMock(return_value=("dummy_path", None))) + @mock.patch('os.path.exists', mock.MagicMock(return_value=True)) + def test_sys_info_provider_init_device_prefix_plugin(self): + with mock.patch("__builtin__.open", mock.mock_open(read_data="C0-")): + SysInfoProvider.init_device_prefix() + assert SysInfoProvider.DEVICE_PREFIX == "/dev/C0-" + SysInfoProvider.DEVICE_PREFIX = "/dev/ttyUSB" + + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=("/dev/ttyUSB0\n/dev/ttyACM1", ""))) + def test_sys_info_provider_list_console_ttys(self): + SysInfoProvider.DEVICE_PREFIX == "/dev/ttyUSB" + ttys = SysInfoProvider.list_console_ttys() + print(SysInfoProvider.DEVICE_PREFIX) + assert len(ttys) == 1 + + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=("", "ls: cannot access '/dev/ttyUSB*': No such file or directory"))) + def test_sys_info_provider_list_console_ttys_device_not_exists(self): + ttys = SysInfoProvider.list_console_ttys() + assert len(ttys) == 0 + + all_active_processes_output = ''+ \ + """ PID STARTED CMD + 8 Mon Nov 2 04:29:41 2020 picocom /dev/ttyUSB0 + """ + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=all_active_processes_output)) + def test_sys_info_provider_list_active_console_processes(self): + SysInfoProvider.DEVICE_PREFIX == "/dev/ttyUSB" + procs = SysInfoProvider.list_active_console_processes() + assert len(procs.keys()) == 1 + assert "0" in procs.keys() + assert procs["0"] == ("8", "Mon Nov 2 04:29:41 2020") + + active_process_output = "13751 Wed Mar 6 08:31:35 2019 /usr/bin/sudo picocom -b 9600 -f n /dev/ttyUSB1" + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=active_process_output)) + def test_sys_info_provider_get_active_console_process_info_exists(self): + SysInfoProvider.DEVICE_PREFIX == "/dev/ttyUSB" + proc = SysInfoProvider.get_active_console_process_info("13751") + assert proc is not None + assert proc == ("1", "13751", "Wed Mar 6 08:31:35 2019") + + active_process_empty_output = "" + @mock.patch('consutil.lib.SysInfoProvider.run_command', mock.MagicMock(return_value=active_process_empty_output)) + def test_sys_info_provider_get_active_console_process_info_nonexists(self): + SysInfoProvider.DEVICE_PREFIX == "/dev/ttyUSB" + proc = SysInfoProvider.get_active_console_process_info("2") + assert proc is None