Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45138: Add timeout support #197

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/20240710_144457_rra_DM_45138.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### New features

- Restore support for execution duration and change the default execution duration back to 10 minutes. Use a very ugly hack to enforce a timeout in the backend worker that will hopefully not be too fragile.
- Re-add the `CUTOUT_TIMEOUT` configuration option to change the default and maximum execution duration for cutout jobs.
14 changes: 7 additions & 7 deletions src/vocutouts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,11 @@ class Config(BaseSettings):
)

sync_timeout: timedelta = Field(
timedelta(minutes=1),
title="Timeout for sync requests",
description=(
"The job will continue running as an async job beyond this"
" timeout since cancellation of jobs is not currently supported."
),
timedelta(minutes=1), title="Timeout for sync requests"
)

timeout: timedelta = Field(
timedelta(minutes=10), title="Timeout for cutout jobs"
)

tmpdir: Path = Field(Path("/tmp"), title="Temporary directory for workers")
Expand Down Expand Up @@ -180,7 +179,7 @@ def _validate_arq_queue_url(cls, v: RedisDsn) -> RedisDsn:
)
return v

@field_validator("lifetime", "sync_timeout", mode="before")
@field_validator("lifetime", "sync_timeout", "timeout", mode="before")
@classmethod
def _parse_timedelta(cls, v: str | float | timedelta) -> float | timedelta:
"""Support human-readable timedeltas."""
Expand Down Expand Up @@ -211,6 +210,7 @@ def uws_config(self) -> UWSConfig:
return UWSConfig(
arq_mode=self.arq_mode,
arq_redis_settings=self.arq_redis_settings,
execution_duration=self.timeout,
lifetime=self.lifetime,
parameters_type=CutoutParameters,
signing_service_account=self.service_account,
Expand Down
20 changes: 8 additions & 12 deletions src/vocutouts/uws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ class encapsulates the configuration of the UWS component that may vary by
database_url: str
"""URL for the metadata database."""

execution_duration: timedelta
"""Maximum execution time in seconds.

Jobs that run longer than this length of time will be automatically
aborted.
"""

lifetime: timedelta
"""The lifetime of jobs.

Expand Down Expand Up @@ -127,15 +134,6 @@ class encapsulates the configuration of the UWS component that may vary by
database_password: SecretStr | None = None
"""Password for the database."""

execution_duration: timedelta = timedelta(seconds=0)
"""Maximum execution time in seconds.

Jobs that run longer than this length of time should be automatically
aborted. However, currently the backend does not support cancelling jobs,
and therefore the only correct value is 0, which indicates that the
execution duration of the job is unlimited.
"""

slack_webhook: SecretStr | None = None
"""Slack incoming webhook for reporting errors."""

Expand Down Expand Up @@ -176,9 +174,7 @@ class encapsulates the configuration of the UWS component that may vary by

If provided, called with the requested execution duration and the current
job record and should return the new execution duration time. Otherwise,
the execution duration may not be changed. Note that the current backend
does not support cancelling jobs and therefore does not support execution
duration values other than 0.
the execution duration may not be changed.
"""

wait_timeout: timedelta = timedelta(minutes=1)
Expand Down
12 changes: 5 additions & 7 deletions src/vocutouts/uws/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,15 +432,13 @@ async def update_execution_duration(
if job.owner != user:
raise PermissionDeniedError(f"Access to job {job_id} denied")

# Validate the new value. Only support changes to execution duration
# if a validator is set, which is a signal that the application
# supports cancellation of jobs. The current implementation does not
# support cancelling jobs and therefore cannot enforce a timeout, so
# an execution duration of 0 is currently the only correct value.
# Validate the new value.
if validator := self._config.validate_execution_duration:
duration = validator(duration, job)
else:
return None
if duration > self._config.execution_duration:
duration = self._config.execution_duration

# Update the duration in the job.
if duration == job.execution_duration:
return None
await self._storage.update_execution_duration(job_id, duration)
Expand Down
99 changes: 57 additions & 42 deletions src/vocutouts/uws/uwsworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from __future__ import annotations

import asyncio
import os
import signal
import uuid
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from enum import Enum
Expand Down Expand Up @@ -35,6 +37,7 @@
"WorkerJobInfo",
"WorkerResult",
"WorkerSettings",
"WorkerTimeoutError",
"WorkerTransientError",
"WorkerUsageError",
"build_worker",
Expand Down Expand Up @@ -102,6 +105,9 @@ class returned by other functions.
max_jobs: int
"""Maximum number of jobs that can be run at one time."""

allow_abort_jobs: bool = False
"""Whether to allow jobs to be aborted."""

queue_name: str = default_queue_name
"""Name of arq queue to listen to for jobs."""

Expand Down Expand Up @@ -129,11 +135,7 @@ class WorkerJobInfo:
"""Delegated Gafaelfawr token to act on behalf of the user."""

timeout: timedelta
"""Maximum execution time for the job.

Currently, this is ignored, since the backend workers do not support
cancellation.
"""
"""Maximum execution time for the job."""

run_id: str | None = None
"""User-supplied run ID, if any."""
Expand Down Expand Up @@ -259,6 +261,20 @@ class WorkerTransientError(WorkerError):
error_type = WorkerErrorType.TRANSIENT


class WorkerTimeoutError(WorkerTransientError):
"""Transient error occurred during worker processing.

The job may be retried with the same parameters and may succeed.
"""

def __init__(self, elapsed: timedelta, timeout: timedelta) -> None:
msg = (
f"Job timed out after {elapsed.total_seconds()}s"
f" (timeout: {timeout.total_seconds()}s)"
)
super().__init__(msg)


class WorkerUsageError(WorkerError):
"""Parameters sent by the user were invalid.

Expand All @@ -271,6 +287,20 @@ class WorkerUsageError(WorkerError):
error_type = WorkerErrorType.USAGE


def _restart_pool(pool: ProcessPoolExecutor) -> ProcessPoolExecutor:
"""Restart the pool after timeout or job cancellation.

This is a horrible, fragile hack, but it appears to be the only way to
enforce a timeout currently in Python since there is no way to abort a
job already in progress. Find the processes underlying the pool, kill
them, and then shut down and recreate the pool.
"""
for pid in pool._processes: # noqa: SLF001
os.kill(pid, signal.SIGINT)
pool.shutdown(wait=True)
return ProcessPoolExecutor(1)


def build_worker(
worker: Callable[[T, WorkerJobInfo, BoundLogger], list[WorkerResult]],
config: WorkerConfig[T],
Expand Down Expand Up @@ -298,27 +328,6 @@ def build_worker(
UWS worker configuration.
logger
Logger to use for messages.

Notes
-----
Timeouts and aborting jobs unfortunately are not supported due to
limitations in `concurrent.futures.ThreadPoolExecutor`. Once a thread has
been started, there is no way to stop it until it completes on its own.
Therefore, no job timeout is set or supported, and the timeout set on the
job (which comes from executionduration) is ignored.

Fixing this appears to be difficult since Python's `threading.Thread`
simply does not support cancellation. It would probably require rebuilding
the worker model on top of processes and killing those processes on
timeout. That would pose problems for cleanup of any temporary resources
created by the process such as temporary files, since Python cleanup code
would not be run.

The best fix would be for backend code to be rewritten to be async, so
await would become a cancellation point (although this still may not be
enough for compute-heavy code that doesn't use await frequently). However,
the Rubin pipelines code is all sync, so async worker support has not yet
been added due to lack of demand.
"""

async def startup(ctx: dict[Any, Any]) -> None:
Expand All @@ -336,13 +345,13 @@ async def startup(ctx: dict[Any, Any]) -> None:

ctx["arq"] = arq
ctx["logger"] = logger
ctx["pool"] = ThreadPoolExecutor(1)
ctx["pool"] = ProcessPoolExecutor(1)

logger.info("Worker startup complete")

async def shutdown(ctx: dict[Any, Any]) -> None:
logger: BoundLogger = ctx["logger"]
pool: ThreadPoolExecutor = ctx["pool"]
pool: ProcessPoolExecutor = ctx["pool"]

pool.shutdown(wait=True, cancel_futures=True)

Expand All @@ -353,7 +362,7 @@ async def run(
) -> list[WorkerResult]:
arq: ArqQueue = ctx["arq"]
logger: BoundLogger = ctx["logger"]
pool: ThreadPoolExecutor = ctx["pool"]
pool: ProcessPoolExecutor = ctx["pool"]

params = config.parameters_class.model_validate(params_raw)
logger = logger.bind(
Expand All @@ -365,28 +374,34 @@ async def run(
if info.run_id:
logger = logger.bind(run_id=info.run_id)

await arq.enqueue("uws_job_started", info.job_id, datetime.now(tz=UTC))
start = datetime.now(tz=UTC)
await arq.enqueue("uws_job_started", info.job_id, start)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(
pool, worker, params, info, logger
)
async with asyncio.timeout(info.timeout.total_seconds()):
return await loop.run_in_executor(
pool, worker, params, info, logger
)
except asyncio.CancelledError:
ctx["pool"] = _restart_pool(pool)
raise
except TimeoutError:
elapsed = datetime.now(tz=UTC) - start
ctx["pool"] = _restart_pool(pool)
raise WorkerTimeoutError(elapsed, info.timeout) from None
finally:
await arq.enqueue("uws_job_completed", info.job_id)

# Job timeouts are not actually supported since we have no way of stopping
# the sync worker. A timeout will just leave the previous worker running
# and will block all future jobs. Set it to an extremely long value, since
# it can't be disabled entirely.
#
# Since the worker is running sync jobs, run one job per pod since they
# will be serialized anyway and no parallelism is possible. If async
# worker support is added, consider making this configurable.
# will be serialized anyway and no parallelism is possible. This also
# allows us to easily restart the job pool on timeout or job abort. If
# async worker support is added, consider making this configurable.
return WorkerSettings(
functions=[func(run, name=worker.__qualname__)],
redis_settings=config.arq_redis_settings,
job_timeout=3600,
job_timeout=config.timeout,
max_jobs=1,
allow_abort_jobs=True,
on_startup=startup,
on_shutdown=shutdown,
)
4 changes: 2 additions & 2 deletions tests/handlers/async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<uws:ownerId>someone</uws:ownerId>
<uws:phase>PENDING</uws:phase>
<uws:creationTime>[DATE]</uws:creationTime>
<uws:executionDuration>0</uws:executionDuration>
<uws:executionDuration>600</uws:executionDuration>
<uws:destruction>[DATE]</uws:destruction>
<uws:parameters>
<uws:parameter id="id" isPost="true">1:2:band:value</uws:parameter>
Expand All @@ -51,7 +51,7 @@
<uws:creationTime>[DATE]</uws:creationTime>
<uws:startTime>[DATE]</uws:startTime>
<uws:endTime>[DATE]</uws:endTime>
<uws:executionDuration>0</uws:executionDuration>
<uws:executionDuration>600</uws:executionDuration>
<uws:destruction>[DATE]</uws:destruction>
<uws:parameters>
<uws:parameter id="id" isPost="true">1:2:band:value</uws:parameter>
Expand Down
1 change: 1 addition & 0 deletions tests/support/uws.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def build_uws_config() -> UWSConfig:
async_post_dependency=_post_dependency,
database_url=database_url,
database_password=SecretStr(os.environ["POSTGRES_PASSWORD"]),
execution_duration=timedelta(minutes=10),
lifetime=timedelta(days=1),
parameters_type=SimpleParameters,
signing_service_account="signer@example.com",
Expand Down
13 changes: 9 additions & 4 deletions tests/uws/job_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<uws:ownerId>user</uws:ownerId>
<uws:phase>{}</uws:phase>
<uws:creationTime>{}</uws:creationTime>
<uws:executionDuration>0</uws:executionDuration>
<uws:executionDuration>{}</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name" isPost="true">Jane</uws:parameter>
Expand All @@ -60,7 +60,7 @@
<uws:creationTime>{}</uws:creationTime>
<uws:startTime>{}</uws:startTime>
<uws:endTime>{}</uws:endTime>
<uws:executionDuration>0</uws:executionDuration>
<uws:executionDuration>{}</uws:executionDuration>
<uws:destruction>{}</uws:destruction>
<uws:parameters>
<uws:parameter id="name" isPost="true">Jane</uws:parameter>
Expand Down Expand Up @@ -127,6 +127,7 @@ async def test_job_run(
"1",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)

Expand Down Expand Up @@ -161,6 +162,7 @@ async def test_job_run(
"1",
"QUEUED",
isodatetime(job.creation_time),
"600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)
await runner.mark_in_progress("user", "1")
Expand Down Expand Up @@ -207,6 +209,7 @@ async def test_job_run(
isodatetime(job.creation_time),
isodatetime(job.start_time),
isodatetime(job.end_time),
"600",
isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)),
)

Expand Down Expand Up @@ -261,6 +264,7 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(destruction_time),
)

Expand All @@ -278,7 +282,7 @@ async def test_job_api(
)
assert r.status_code == 200
assert r.headers["Content-Type"] == "text/plain; charset=utf-8"
assert r.text == "0"
assert r.text == "600"

r = await client.get(
"/test/jobs/1/owner", headers={"X-Auth-Request-User": "user"}
Expand Down Expand Up @@ -318,7 +322,6 @@ async def test_job_api(
assert r.status_code == 303
assert r.headers["Location"] == "https://example.com/test/jobs/1"

# Changing the execution duration is not supported.
r = await client.post(
"/test/jobs/1/executionduration",
headers={"X-Auth-Request-User": "user"},
Expand All @@ -337,6 +340,7 @@ async def test_job_api(
"1",
"PENDING",
isodatetime(job.creation_time),
"300",
isodatetime(now),
)

Expand Down Expand Up @@ -367,6 +371,7 @@ async def test_job_api(
"2",
"PENDING",
isodatetime(job.creation_time),
"600",
isodatetime(job.destruction_time),
)
r = await client.post(
Expand Down
Loading