Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update executor.py to have proper asyncio support #595

Merged
merged 1 commit into from
Jan 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions catkin_tools/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
# Check for stage synchronization lock
if stage.locked_resource is not None:
lock = locks.setdefault(stage.locked_resource, asyncio.Lock())
yield from(lock)
yield from lock
else:
lock = FakeLock()

Expand All @@ -79,7 +79,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
if stage.occupy_job:
if not occupying_job:
while job_server.try_acquire() is None:
yield from(asyncio.sleep(0.05))
yield from asyncio.sleep(0.05)
occupying_job = True
else:
if occupying_job:
Expand All @@ -103,7 +103,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
# Get the logger
protocol_type = stage.logger_factory(verb, job.jid, stage.label, event_queue, log_path)
# Start asynchroonous execution
transport, logger = yield from(
transport, logger = yield from (
async_execute_process(
protocol_type,
**stage.async_execute_process_kwargs))
Expand All @@ -112,7 +112,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
if 'Text file busy' in str(exc):
# This is a transient error, try again shortly
# TODO: report the file causing the problem (exc.filename)
yield from(asyncio.sleep(0.01))
yield from asyncio.sleep(0.01)
continue
raise

Expand All @@ -125,7 +125,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
**stage.async_execute_process_kwargs))

# Asynchronously yield until this command is completed
retcode = yield from(logger.complete)
retcode = yield from logger.complete
except: # noqa: E722
# Bare except is permissable here because the set of errors which the CommandState might raise
# is unbounded. We capture the traceback here and save it to the build's log files.
Expand All @@ -137,11 +137,11 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
logger = IOBufferLogger(verb, job.jid, stage.label, event_queue, log_path)
try:
# Asynchronously yield until this function is completed
retcode = yield from(get_loop().run_in_executor(
retcode = yield from get_loop().run_in_executor(
threadpool,
stage.function,
logger,
event_queue))
event_queue)
except: # noqa: E722
# Bare except is permissable here because the set of errors which the FunctionStage might raise
# is unbounded. We capture the traceback here and save it to the build's log files.
Expand Down Expand Up @@ -179,7 +179,7 @@ def async_job(verb, job, threadpool, locks, event_queue, log_path):
lock.release()

# Finally, return whether all stages of the job completed
return(job.jid, all_stages_succeeded)
return (job.jid, all_stages_succeeded)


@asyncio.coroutine
Expand Down Expand Up @@ -272,14 +272,14 @@ def execute_jobs(
))

# Process jobs as they complete asynchronously
done_job_fs, active_job_fs = yield from(asyncio.wait(
done_job_fs, active_job_fs = yield from asyncio.wait(
active_job_fs,
timeout=0.10,
return_when=FIRST_COMPLETED))
return_when=FIRST_COMPLETED)

for done_job_f in done_job_fs:
# Capture a result once the job has finished
job_id, succeeded = yield from(done_job_f)
job_id, succeeded = yield from done_job_f

# Release a jobserver token now that this job has succeeded
job_server.release(job_id)
Expand Down Expand Up @@ -362,7 +362,7 @@ def execute_jobs(
completed=completed_jobs
))

return(all(completed_jobs.values()))
return all(completed_jobs.values())


def run_until_complete(coroutine):
Expand Down