Skip to content

Commit

Permalink
Queue up testcase updates in .25s intervals; #408
Browse files Browse the repository at this point in the history
  • Loading branch information
Xyene committed Oct 26, 2019
1 parent dc7b080 commit 2f259c6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 37 deletions.
3 changes: 0 additions & 3 deletions dmoj/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ def supported_problems_packet(self, problems):
def test_case_status_packet(self, position, result):
pass

def multi_test_case_status_packet(self, updates):
pass

def compile_error_packet(self, log):
pass

Expand Down
87 changes: 57 additions & 30 deletions dmoj/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from dmoj import sysinfo
from dmoj.judgeenv import get_runtime_versions, get_supported_problems
from dmoj.result import Result
from dmoj.utils.unicode import utf8bytes, utf8text

if TYPE_CHECKING:
Expand Down Expand Up @@ -66,6 +67,9 @@ def __init__(self, host: str, port: int, judge: 'Judge', name: str, key: str,

self._lock = threading.RLock()
self._batch = 0
self._testcase_queue_lock = threading.Lock()
self._testcase_queue: List[Tuple[int, Result]] = []

# Exponential backoff: starting at 4 seconds.
# Certainly hope it won't stack overflow, since it will take days if not years.
self.fallback = 4
Expand Down Expand Up @@ -133,7 +137,7 @@ def close(self):
self.conn.shutdown(socket.SHUT_RDWR)
self._closed = True

def _read_async(self):
def _read_forever(self):
try:
while True:
self._receive_packet(self._read_single())
Expand Down Expand Up @@ -162,15 +166,51 @@ def _read_single(self) -> dict:
return json.loads(utf8text(packet))

def run(self):
self._read_async()
threading.Thread(target=self._periodically_flush_testcase_queue).start()
self._read_forever()

def disconnect(self):
self.close()
self.judge.terminate_grading()
sys.exit(0)

def run_async(self):
threading.Thread(target=self._read_async).start()
def _flush_testcase_queue(self):
with self._testcase_queue_lock:
if not self._testcase_queue:
return

# Copy and release the lock quickly so that submission can continue grading.
testcase_queue = self._testcase_queue[:]
self._testcase_queue.clear()

self._send_packet({'name': 'test-case-status',
'submission-id': self.judge.current_submission_id,
'cases': [
{
'position': position,
'status': result.result_flag,
'time': result.execution_time,
'points': result.points,
'total-points': result.total_points,
'memory': result.max_memory,
'output': result.output,
'extended-feedback': result.extended_feedback,
'feedback': result.feedback,
} for position, result in testcase_queue
]})

def _periodically_flush_testcase_queue(self):
try:
while True:
time.sleep(0.25)
# It is okay if we flush the testcase queue even while the connection is not open or there's nothing
# grading, since the only thing that can queue testcases is a currently-grading submission.
self._flush_testcase_queue()
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
raise SystemExit(1)

def _send_packet(self, packet: dict):
for k, v in packet.items():
Expand Down Expand Up @@ -239,32 +279,14 @@ def supported_problems_packet(self, problems: List[Tuple[str, int]]):
self._send_packet({'name': 'supported-problems',
'problems': problems})

def multi_test_case_status_packet(self, updates):
for result, position in updates:
log.info('Test case on %d: #%d, %s [%.3fs | %.2f MB], %.1f/%.0f',
self.judge.current_submission_id, position,
', '.join(result.readable_codes()),
result.execution_time, result.max_memory / 1024.0,
result.points, result.total_points)

self._send_packet({'name': 'test-case-status',
'submission-id': self.judge.current_submission_id,
'cases': [
{
'position': position,
'status': result.result_flag,
'time': result.execution_time,
'points': result.points,
'total-points': result.total_points,
'memory': result.max_memory,
'output': result.output,
'extended-feedback': result.extended_feedback,
'feedback': result.feedback,
} for position, result in updates
]})

def test_case_status_packet(self, position: int, result):
self.multi_test_case_status_packet([(position, result)])
def test_case_status_packet(self, position: int, result: Result):
log.info('Test case on %d: #%d, %s [%.3fs | %.2f MB], %.1f/%.0f',
self.judge.current_submission_id, position,
', '.join(result.readable_codes()),
result.execution_time, result.max_memory / 1024.0,
result.points, result.total_points)
with self._testcase_queue_lock:
self._testcase_queue.append((position, result))

def compile_error_packet(self, message: str):
log.info('Compile error: %d', self.judge.current_submission_id)
Expand All @@ -281,6 +303,7 @@ def compile_message_packet(self, message: str):

def internal_error_packet(self, message: str):
log.info('Internal error: %d', self.judge.current_submission_id)
self._flush_testcase_queue()
self._send_packet({'name': 'internal-error',
'submission-id': self.judge.current_submission_id,
'message': message})
Expand All @@ -294,17 +317,20 @@ def begin_grading_packet(self, is_pretested: bool):
def grading_end_packet(self):
log.info('End grading: %d', self.judge.current_submission_id)
self.fallback = 4
self._flush_testcase_queue()
self._send_packet({'name': 'grading-end',
'submission-id': self.judge.current_submission_id})

def batch_begin_packet(self):
self._batch += 1
log.info('Enter batch number %d: %d', self._batch, self.judge.current_submission_id)
self._flush_testcase_queue()
self._send_packet({'name': 'batch-begin',
'submission-id': self.judge.current_submission_id})

def batch_end_packet(self):
log.info('Exit batch number %d: %d', self._batch, self.judge.current_submission_id)
self._flush_testcase_queue()
self._send_packet({'name': 'batch-end',
'submission-id': self.judge.current_submission_id})

Expand All @@ -315,6 +341,7 @@ def current_submission_packet(self):

def submission_terminated_packet(self):
log.info('Submission aborted: %d', self.judge.current_submission_id)
self._flush_testcase_queue()
self._send_packet({'name': 'submission-terminated',
'submission-id': self.judge.current_submission_id})

Expand Down
4 changes: 0 additions & 4 deletions dmoj/testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ def test_case_status_packet(self, position, result):
self.fail('Unexpected feedback: "%s", expected: "%s"' %
(result.feedback, '", "'.join(feedback)))

def multi_test_case_status_packet(self, updates):
for position, result in updates:
self.test_case_status_packet(position, result)

def compile_error_packet(self, log):
if 'CE' not in self.codes_all:
self.fail('Unexpected compile error')
Expand Down

0 comments on commit 2f259c6

Please sign in to comment.