Skip to content

Commit

Permalink
Wait for jobs to finish before cancelling them
Browse files Browse the repository at this point in the history
Wait for 30 seconds (UWS database workers) or 55 seconds (cutout
workers) for jobs to finish before cancelling them. This should match
settings on the Kubernetes pod to provide a shutdown grace period.
  • Loading branch information
rra committed Jul 16, 2024
1 parent defb71d commit 3660203
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20240715_170552_rra_DM_45138.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Worker pods now wait for 30 seconds (UWS database workers) or 55 seconds (cutout workers) for jobs to finish on shutdown before cancelling them.
1 change: 1 addition & 0 deletions src/vocutouts/uws/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def shutdown(ctx: dict[Any, Any]) -> None:
],
functions=[uws_job_started, uws_job_completed],
redis_settings=self._config.arq_redis_settings,
job_completion_wait=UWS_DATABASE_TIMEOUT,
job_timeout=UWS_DATABASE_TIMEOUT,
max_jobs=10,
queue_name=UWS_QUEUE_NAME,
Expand Down
10 changes: 8 additions & 2 deletions src/vocutouts/uws/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"""How long to wait for a job to stop before giving up."""

UWS_DATABASE_TIMEOUT = timedelta(seconds=30)
"""Timeout on workers that update the UWS database."""
"""Timeout on workers that update the UWS database.
This should match the default Kubernetes grace period for a pod to shut down.
"""

UWS_EXPIRE_JOBS_SCHEDULE = Options(
month=None,
Expand All @@ -35,4 +38,7 @@
"""Schedule for job expiration cron job, as `arq.cron.cron` parameters."""

UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages."""
"""Name of the arq queue for internal UWS messages.
Must match ``_UWS_QUEUE_NAME`` in :mod:`vocutouts.uws.uwsworker`.
"""
21 changes: 17 additions & 4 deletions src/vocutouts/uws/uwsworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@
from safir.arq import ArqMode, ArqQueue, MockArqQueue, RedisArqQueue
from structlog.stdlib import BoundLogger

from .constants import UWS_QUEUE_NAME

T = TypeVar("T", bound="BaseModel")
"""Type for job parameters."""

_COMPLETION_WAIT = timedelta(seconds=55)
"""How long to wait for job completion before killing the job."""

_UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages.
Must match `~vocutouts.uws.constants.UWS_QUEUE_NAME`.
"""

__all__ = [
"WorkerConfig",
Expand All @@ -40,6 +48,7 @@
"WorkerTimeoutError",
"WorkerTransientError",
"WorkerUsageError",
"T",
"build_worker",
]

Expand Down Expand Up @@ -99,6 +108,9 @@ class returned by other functions.
redis_settings: RedisSettings
"""Redis configuration for arq."""

job_completion_wait: SecondsTimedelta
"""How long to wait for jobs to complete before cancelling them."""

job_timeout: SecondsTimedelta
"""Maximum timeout for all jobs."""

Expand Down Expand Up @@ -338,10 +350,10 @@ async def startup(ctx: dict[Any, Any]) -> None:
if config.arq_mode == ArqMode.production:
settings = config.arq_redis_settings
arq: ArqQueue = await RedisArqQueue.initialize(
settings, default_queue_name=UWS_QUEUE_NAME
settings, default_queue_name=_UWS_QUEUE_NAME
)
else:
arq = MockArqQueue(default_queue_name=UWS_QUEUE_NAME)
arq = MockArqQueue(default_queue_name=_UWS_QUEUE_NAME)

ctx["arq"] = arq
ctx["logger"] = logger
Expand Down Expand Up @@ -399,6 +411,7 @@ async def run(
return WorkerSettings(
functions=[func(run, name=worker.__qualname__)],
redis_settings=config.arq_redis_settings,
job_completion_wait=_COMPLETION_WAIT,
job_timeout=config.timeout,
max_jobs=1,
allow_abort_jobs=True,
Expand Down
4 changes: 3 additions & 1 deletion tests/uws/workers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from vocutouts.uws.app import UWSApplication
from vocutouts.uws.config import UWSConfig
from vocutouts.uws.constants import UWS_QUEUE_NAME
from vocutouts.uws.constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME
from vocutouts.uws.dependencies import UWSFactory
from vocutouts.uws.models import ErrorCode, UWSJobParameter, UWSJobResult
from vocutouts.uws.storage import JobStore
Expand Down Expand Up @@ -70,6 +70,7 @@ async def test_build_worker(
assert settings.functions[0].name == hello.__qualname__
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.allow_abort_jobs
assert settings.job_completion_wait
assert settings.queue_name == default_queue_name
assert settings.on_startup
assert settings.on_shutdown
Expand Down Expand Up @@ -228,6 +229,7 @@ async def test_build_uws_worker(
assert callable(expire_jobs)
assert settings.redis_settings == uws_config.arq_redis_settings
assert not settings.allow_abort_jobs
assert settings.job_completion_wait == UWS_DATABASE_TIMEOUT
assert settings.queue_name == UWS_QUEUE_NAME
assert settings.on_startup
assert settings.on_shutdown
Expand Down

0 comments on commit 3660203

Please sign in to comment.