diff --git a/dmoj/cli.py b/dmoj/cli.py index a1dceaac7..a704a65c6 100644 --- a/dmoj/cli.py +++ b/dmoj/cli.py @@ -2,8 +2,9 @@ import readline import shlex import sys -from typing import cast +from typing import List, cast +from dmoj.commands.base_command import GradedSubmission from dmoj.error import InvalidCommandException from dmoj.judge import Judge from dmoj.packet import PacketManager @@ -61,6 +62,8 @@ def close(self): class LocalJudge(Judge): + graded_submissions: List[GradedSubmission] + def __init__(self): super().__init__(cast(PacketManager, LocalPacketManager(self))) self.submission_id_counter = 0 diff --git a/dmoj/commands/base_command.py b/dmoj/commands/base_command.py index 2504b6365..68ba9f81e 100644 --- a/dmoj/commands/base_command.py +++ b/dmoj/commands/base_command.py @@ -4,19 +4,24 @@ import sys import tempfile from collections import OrderedDict -from typing import Dict +from typing import Dict, NoReturn, Optional, TYPE_CHECKING, Tuple from dmoj.error import InvalidCommandException from dmoj.executors import executors from dmoj.utils.ansi import print_ansi +if TYPE_CHECKING: + from dmoj.cli import LocalJudge + +GradedSubmission = Tuple[str, str, str, float, int] + class CommandArgumentParser(argparse.ArgumentParser): - def error(self, message): + def error(self, message: str) -> NoReturn: self.print_usage(sys.stderr) raise InvalidCommandException - def exit(self, status=0, message=None): + def exit(self, status: int = 0, message: Optional[str] = None) -> NoReturn: if message: self._print_message(message, sys.stderr) raise InvalidCommandException @@ -26,19 +31,19 @@ class Command: name = 'command' help = '' - def __init__(self, judge): + def __init__(self, judge: 'LocalJudge') -> None: self.judge = judge self.arg_parser = CommandArgumentParser(prog=self.name, description=self.help) self._populate_parser() - def get_source(self, source_file): + def get_source(self, source_file: str) -> str: try: with open(os.path.realpath(source_file)) as f: return f.read() except Exception as io: raise InvalidCommandException(str(io)) - def get_submission_data(self, submission_id): + def get_submission_data(self, submission_id: int) -> GradedSubmission: # don't wrap around if submission_id > 0: try: @@ -46,13 +51,13 @@ def get_submission_data(self, submission_id): except IndexError: pass - raise InvalidCommandException("invalid submission '%d'" % submission_id) + raise InvalidCommandException(f"invalid submission '{submission_id}'") - def open_editor(self, lang, src=b''): + def open_editor(self, lang: str, src: str = '') -> str: file_suffix = '.' + executors[lang].Executor.ext editor = os.environ.get('EDITOR') if editor: - with tempfile.NamedTemporaryFile(suffix=file_suffix) as temp: + with tempfile.NamedTemporaryFile(mode='w+', suffix=file_suffix) as temp: temp.write(src) temp.flush() subprocess.call([editor, temp.name]) @@ -60,28 +65,28 @@ def open_editor(self, lang, src=b''): src = temp.read() else: print_ansi('#ansi[$EDITOR not set, falling back to stdin](yellow)\n') - src = [] + lines = [] try: while True: s = input() if s.strip() == ':q': raise EOFError - src.append(s) + lines.append(s) except EOFError: # Ctrl+D - src = '\n'.join(src) + src = '\n'.join(lines) except Exception as io: raise InvalidCommandException(str(io)) return src - def _populate_parser(self): + def _populate_parser(self) -> None: pass - def execute(self, line): + def execute(self, line: str) -> Optional[int]: raise NotImplementedError commands: Dict[str, Command] = OrderedDict() -def register_command(command): +def register_command(command: Command) -> None: commands[command.name] = command diff --git a/dmoj/commands/diff.py b/dmoj/commands/diff.py index f848771a7..09cfe9880 100644 --- a/dmoj/commands/diff.py +++ b/dmoj/commands/diff.py @@ -1,4 +1,5 @@ import difflib +from typing import List import pygments.formatters import pygments.lexers @@ -10,11 +11,11 @@ class DifferenceCommand(Command): name = 'diff' help = 'Shows difference between two files.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('id_or_source_1', help='id or path of first source', metavar='') self.arg_parser.add_argument('id_or_source_2', help='id or path of second source', metavar='') - def get_data(self, id_or_source): + def get_data(self, id_or_source: str) -> List[str]: try: _, _, src, _, _ = self.get_submission_data(int(id_or_source)) except ValueError: @@ -22,11 +23,11 @@ def get_data(self, id_or_source): return src.splitlines() - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) - file1 = args.id_or_source_1 - file2 = args.id_or_source_2 + file1: str = args.id_or_source_1 + file2: str = args.id_or_source_2 data1 = self.get_data(file1) data2 = self.get_data(file2) diff --git a/dmoj/commands/help.py b/dmoj/commands/help.py index 7487a3fee..215c435b4 100644 --- a/dmoj/commands/help.py +++ b/dmoj/commands/help.py @@ -5,10 +5,10 @@ class HelpCommand(Command): name = 'help' help = 'Prints listing of commands.' - def execute(self, line): + def execute(self, line: str) -> None: print('Run `command -h/--help` for individual command usage.') for name, command in commands.items(): if command == self: continue - print(' %s: %s' % (name, command.help)) + print(f' {name}: {command.help}') print() diff --git a/dmoj/commands/problems.py b/dmoj/commands/problems.py index 102e8ed6e..676380fe8 100644 --- a/dmoj/commands/problems.py +++ b/dmoj/commands/problems.py @@ -10,11 +10,11 @@ class ListProblemsCommand(Command): name = 'problems' help = 'Lists the problems available to be graded on this judge.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('filter', nargs='?', help='regex filter for problem names (optional)') self.arg_parser.add_argument('-l', '--limit', type=int, help='limit number of results', metavar='') - def execute(self, line): + def execute(self, line: str) -> None: _args = self.arg_parser.parse_args(line) if _args.limit is not None and _args.limit <= 0: @@ -32,7 +32,7 @@ def execute(self, line): if len(all_problems): max_len = max(len(p) for p in all_problems) for row in zip_longest(*[iter(all_problems)] * 4, fillvalue=''): - print(' '.join(('%*s' % (-max_len, row[i])) for i in range(4))) + print(' '.join(f'{row[i]:<{max_len}}' for i in range(4))) print() else: raise InvalidCommandException('No problems matching filter found.') diff --git a/dmoj/commands/quit.py b/dmoj/commands/quit.py index 800e46eca..fb978e31b 100644 --- a/dmoj/commands/quit.py +++ b/dmoj/commands/quit.py @@ -7,5 +7,5 @@ class QuitCommand(Command): name = 'quit' help = 'Exits the DMOJ command-line interface.' - def execute(self, line): + def execute(self, line: str) -> None: sys.exit(0) diff --git a/dmoj/commands/rejudge.py b/dmoj/commands/rejudge.py index 1be269a43..7e47c7710 100644 --- a/dmoj/commands/rejudge.py +++ b/dmoj/commands/rejudge.py @@ -6,10 +6,10 @@ class RejudgeCommand(Command): name = 'rejudge' help = 'Rejudge a submission.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('submission_id', type=int, help='id of submission to rejudge') - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) problem_id, lang, src, tl, ml = self.get_submission_data(args.submission_id) diff --git a/dmoj/commands/resubmit.py b/dmoj/commands/resubmit.py index 279f21a9c..32e6d97f7 100644 --- a/dmoj/commands/resubmit.py +++ b/dmoj/commands/resubmit.py @@ -9,7 +9,7 @@ class ResubmitCommand(Command): name = 'resubmit' help = 'Resubmit a submission with different parameters.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('submission_id', type=int, help='id of submission to resubmit') self.arg_parser.add_argument('-p', '--problem', help='id of problem to grade', metavar='') self.arg_parser.add_argument( @@ -22,7 +22,7 @@ def _populate_parser(self): '-ml', '--memory-limit', type=int, help='memory limit for grading, in kilobytes', metavar='' ) - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) problem_id, lang, src, tl, ml = self.get_submission_data(args.submission_id) @@ -33,9 +33,9 @@ def execute(self, line): ml = args.memory_limit or ml if id not in judgeenv.get_supported_problems(): - raise InvalidCommandException("unknown problem '%s'" % problem_id) + raise InvalidCommandException(f"unknown problem '{problem_id}'") elif lang not in executors: - raise InvalidCommandException("unknown language '%s'" % lang) + raise InvalidCommandException(f"unknown language '{lang}'") elif tl <= 0: raise InvalidCommandException('--time-limit must be >= 0') elif ml <= 0: diff --git a/dmoj/commands/show.py b/dmoj/commands/show.py index 4177ae8f1..551f80a4c 100644 --- a/dmoj/commands/show.py +++ b/dmoj/commands/show.py @@ -1,5 +1,8 @@ +from typing import Tuple + import pygments.formatters import pygments.lexers +from pygments.lexer import Lexer from dmoj.commands.base_command import Command @@ -8,17 +11,17 @@ class ShowCommand(Command): name = 'show' help = 'Shows file based on submission ID or filename.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('id_or_source', help='id or path of submission to show', metavar='') - def get_data(self, id_or_source): + def get_data(self, id_or_source: str) -> Tuple[str, Lexer]: try: - id = int(id_or_source) + sub_id = int(id_or_source) except ValueError: src = self.get_source(id_or_source) lexer = pygments.lexers.get_lexer_for_filename(id_or_source) else: - _, lang, src, _, _ = self.get_submission_data(id) + _, lang, src, _, _ = self.get_submission_data(sub_id) # TODO: after executor->extension mapping is built-in to the judge, redo this if lang in ['PY2', 'PYPY2']: @@ -30,7 +33,7 @@ def get_data(self, id_or_source): return src, lexer - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) data, lexer = self.get_data(args.id_or_source) diff --git a/dmoj/commands/submissions.py b/dmoj/commands/submissions.py index e677f773d..d1129b1dc 100644 --- a/dmoj/commands/submissions.py +++ b/dmoj/commands/submissions.py @@ -7,12 +7,12 @@ class ListSubmissionsCommand(Command): name = 'submissions' help = 'List past submissions.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument( '-l', '--limit', type=int, help='limit number of results by most recent', metavar='' ) - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) if args.limit is not None and args.limit <= 0: @@ -21,5 +21,5 @@ def execute(self, line): submissions = self.judge.graded_submissions if not args.limit else self.judge.graded_submissions[: args.limit] for i, (problem, lang, src, tl, ml) in enumerate(submissions): - print_ansi('#ansi[%s](yellow)/#ansi[%s](green) in %s' % (problem, i + 1, lang)) + print_ansi(f'#ansi[{problem}](yellow)/#ansi[{i + 1}](green) in {lang}') print() diff --git a/dmoj/commands/submit.py b/dmoj/commands/submit.py index f8460db1a..105abc58e 100644 --- a/dmoj/commands/submit.py +++ b/dmoj/commands/submit.py @@ -1,3 +1,5 @@ +from typing import Optional + from dmoj import judgeenv from dmoj.commands.base_command import Command from dmoj.error import InvalidCommandException @@ -9,7 +11,7 @@ class SubmitCommand(Command): name = 'submit' help = 'Grades a submission.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('problem_id', help='id of problem to grade') self.arg_parser.add_argument( 'language_id', nargs='?', default=None, help='id of the language to grade in (e.g., PY2)' @@ -34,21 +36,21 @@ def _populate_parser(self): metavar='', ) - def execute(self, line): + def execute(self, line: str) -> None: args = self.arg_parser.parse_args(line) - problem_id = args.problem_id - language_id = args.language_id - time_limit = args.time_limit - memory_limit = args.memory_limit - source_file = args.source_file + problem_id: str = args.problem_id + language_id: Optional[str] = args.language_id + time_limit: float = args.time_limit + memory_limit: int = args.memory_limit + source_file: Optional[str] = args.source_file if language_id not in executors: source_file = language_id language_id = None # source file / language id optional if problem_id not in judgeenv.get_supported_problems(): - raise InvalidCommandException("unknown problem '%s'" % problem_id) + raise InvalidCommandException(f"unknown problem '{problem_id}'") elif not language_id: if source_file: filename, dot, ext = source_file.partition('.') @@ -61,12 +63,13 @@ def execute(self, line): else: raise InvalidCommandException('no language is selected') elif language_id not in executors: - raise InvalidCommandException("unknown language '%s'" % language_id) + raise InvalidCommandException(f"unknown language '{language_id}'") elif time_limit <= 0: raise InvalidCommandException('--time-limit must be >= 0') elif memory_limit <= 0: raise InvalidCommandException('--memory-limit must be >= 0') + assert language_id is not None src = self.get_source(source_file) if source_file else self.open_editor(language_id) self.judge.submission_id_counter += 1 diff --git a/dmoj/commands/test.py b/dmoj/commands/test.py index 85f12067e..d5b5d0eca 100644 --- a/dmoj/commands/test.py +++ b/dmoj/commands/test.py @@ -1,5 +1,6 @@ import sys import traceback +from typing import Any, Dict, Iterable from dmoj.commands.base_command import Command from dmoj.error import InvalidCommandException @@ -10,8 +11,8 @@ class ProblemTester(Tester): - def test_problem(self, problem_id): - self.output(ansi_style('Testing problem #ansi[%s](cyan|bold)...') % problem_id) + def run_problem_tests(self, problem_id: str) -> int: + self.output(ansi_style(f'Testing problem #ansi[{problem_id}](cyan|bold)...')) config = ProblemConfig(ProblemDataManager(get_problem_root(problem_id))) @@ -27,7 +28,7 @@ def test_problem(self, problem_id): continue test_name = test.get('label', test['source']) - self.output(ansi_style('\tRunning test #ansi[%s](yellow|bold)') % test_name) + self.output(ansi_style(f'\tRunning test #ansi[{test_name}](yellow|bold)')) try: test_fails = self.run_test(problem_id, test) except Exception: @@ -36,14 +37,15 @@ def test_problem(self, problem_id): self.output(traceback.format_exc()) else: self.output( - ansi_style('\tResult of test #ansi[%s](yellow|bold): ') % test_name + ansi_style(f'\tResult of test #ansi[{test_name}](yellow|bold): ') + ansi_style(['#ansi[Failed](red|bold)', '#ansi[Success](green|bold)'][not test_fails]) ) fails += test_fails return fails - def _check_targets(targets): + @staticmethod + def _check_targets(targets: Iterable[str]) -> bool: if 'posix' in targets: return True if 'freebsd' in sys.platform: @@ -55,7 +57,7 @@ def _check_targets(targets): return True return False - def run_test(self, problem_id, config): + def run_test(self, problem_id: str, config: Dict[str, Any]) -> int: if 'targets' in config and not self._check_targets(config['targets']): return 0 @@ -66,37 +68,35 @@ class TestCommand(Command): name = 'test' help = 'Runs tests on problems.' - def _populate_parser(self): + def _populate_parser(self) -> None: self.arg_parser.add_argument('problem_ids', nargs='+', help='ids of problems to test') - def execute(self, line): + def execute(self, line: str) -> int: args = self.arg_parser.parse_args(line) problem_ids = args.problem_ids supported_problems = set(get_supported_problems()) - unknown_problems = ', '.join( - map(lambda x: "'%s'" % x, filter(lambda problem_id: problem_id not in supported_problems, problem_ids)) - ) + unknown_problems = ', '.join(f"'{i}'" for i in problem_ids if i not in supported_problems) if unknown_problems: - raise InvalidCommandException('unknown problem(s) %s' % unknown_problems) + raise InvalidCommandException(f'unknown problem(s) {unknown_problems}') tester = ProblemTester() total_fails = 0 for problem_id in problem_ids: - fails = tester.test_problem(problem_id) + fails = tester.run_problem_tests(problem_id) if fails: - print_ansi('Problem #ansi[%s](cyan|bold) #ansi[failed %d case(s)](red|bold).' % (problem_id, fails)) + print_ansi(f'Problem #ansi[{problem_id}](cyan|bold) #ansi[failed {fails} case(s)](red|bold).') else: - print_ansi('Problem #ansi[%s](cyan|bold) passed with flying colours.' % problem_id) + print_ansi(f'Problem #ansi[{problem_id}](cyan|bold) passed with flying colours.') print() total_fails += fails print() print('Test complete.') - if fails: - print_ansi('#ansi[A total of %d test(s) failed](red|bold)' % fails) + if total_fails: + print_ansi(f'#ansi[A total of {total_fails} test(s) failed](red|bold)') else: print_ansi('#ansi[All tests passed.](green|bold)') - return fails + return total_fails