diff --git a/changelog.d/20240715_170552_rra_DM_45138.md b/changelog.d/20240715_170552_rra_DM_45138.md new file mode 100644 index 0000000..cd26495 --- /dev/null +++ b/changelog.d/20240715_170552_rra_DM_45138.md @@ -0,0 +1,3 @@ +### New features + +- Worker pods now wait for 30 seconds (UWS database workers) or 55 seconds (cutout workers) for jobs to finish on shutdown before cancelling them. diff --git a/src/vocutouts/uws/app.py b/src/vocutouts/uws/app.py index bce84c3..b1c23cb 100644 --- a/src/vocutouts/uws/app.py +++ b/src/vocutouts/uws/app.py @@ -115,6 +115,7 @@ async def shutdown(ctx: dict[Any, Any]) -> None: ], functions=[uws_job_started, uws_job_completed], redis_settings=self._config.arq_redis_settings, + job_completion_wait=UWS_DATABASE_TIMEOUT, job_timeout=UWS_DATABASE_TIMEOUT, max_jobs=10, queue_name=UWS_QUEUE_NAME, diff --git a/src/vocutouts/uws/constants.py b/src/vocutouts/uws/constants.py index 323c502..ce30cff 100644 --- a/src/vocutouts/uws/constants.py +++ b/src/vocutouts/uws/constants.py @@ -21,7 +21,10 @@ """How long to wait for a job to stop before giving up.""" UWS_DATABASE_TIMEOUT = timedelta(seconds=30) -"""Timeout on workers that update the UWS database.""" +"""Timeout on workers that update the UWS database. + +This should match the default Kubernetes grace period for a pod to shut down. +""" UWS_EXPIRE_JOBS_SCHEDULE = Options( month=None, @@ -35,4 +38,7 @@ """Schedule for job expiration cron job, as `arq.cron.cron` parameters.""" UWS_QUEUE_NAME = "uws:queue" -"""Name of the arq queue for internal UWS messages.""" +"""Name of the arq queue for internal UWS messages. + +Must match ``_UWS_QUEUE_NAME`` in :mod:`vocutouts.uws.uwsworker`. +""" diff --git a/src/vocutouts/uws/uwsworker.py b/src/vocutouts/uws/uwsworker.py index 2b0b493..3125c6e 100644 --- a/src/vocutouts/uws/uwsworker.py +++ b/src/vocutouts/uws/uwsworker.py @@ -25,9 +25,17 @@ from safir.arq import ArqMode, ArqQueue, MockArqQueue, RedisArqQueue from structlog.stdlib import BoundLogger -from .constants import UWS_QUEUE_NAME - T = TypeVar("T", bound="BaseModel") +"""Type for job parameters.""" + +_COMPLETION_WAIT = timedelta(seconds=55) +"""How long to wait for job completion before killing the job.""" + +_UWS_QUEUE_NAME = "uws:queue" +"""Name of the arq queue for internal UWS messages. + +Must match `~vocutouts.uws.constants.UWS_QUEUE_NAME`. +""" __all__ = [ "WorkerConfig", @@ -40,6 +48,7 @@ "WorkerTimeoutError", "WorkerTransientError", "WorkerUsageError", + "T", "build_worker", ] @@ -99,6 +108,9 @@ class returned by other functions. redis_settings: RedisSettings """Redis configuration for arq.""" + job_completion_wait: SecondsTimedelta + """How long to wait for jobs to complete before cancelling them.""" + job_timeout: SecondsTimedelta """Maximum timeout for all jobs.""" @@ -338,10 +350,10 @@ async def startup(ctx: dict[Any, Any]) -> None: if config.arq_mode == ArqMode.production: settings = config.arq_redis_settings arq: ArqQueue = await RedisArqQueue.initialize( - settings, default_queue_name=UWS_QUEUE_NAME + settings, default_queue_name=_UWS_QUEUE_NAME ) else: - arq = MockArqQueue(default_queue_name=UWS_QUEUE_NAME) + arq = MockArqQueue(default_queue_name=_UWS_QUEUE_NAME) ctx["arq"] = arq ctx["logger"] = logger @@ -399,6 +411,7 @@ async def run( return WorkerSettings( functions=[func(run, name=worker.__qualname__)], redis_settings=config.arq_redis_settings, + job_completion_wait=_COMPLETION_WAIT, job_timeout=config.timeout, max_jobs=1, allow_abort_jobs=True, diff --git a/tests/uws/workers_test.py b/tests/uws/workers_test.py index 5f10496..e8e18e1 100644 --- a/tests/uws/workers_test.py +++ b/tests/uws/workers_test.py @@ -20,7 +20,7 @@ from vocutouts.uws.app import UWSApplication from vocutouts.uws.config import UWSConfig -from vocutouts.uws.constants import UWS_QUEUE_NAME +from vocutouts.uws.constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME from vocutouts.uws.dependencies import UWSFactory from vocutouts.uws.models import ErrorCode, UWSJobParameter, UWSJobResult from vocutouts.uws.storage import JobStore @@ -70,6 +70,7 @@ async def test_build_worker( assert settings.functions[0].name == hello.__qualname__ assert settings.redis_settings == uws_config.arq_redis_settings assert settings.allow_abort_jobs + assert settings.job_completion_wait assert settings.queue_name == default_queue_name assert settings.on_startup assert settings.on_shutdown @@ -228,6 +229,7 @@ async def test_build_uws_worker( assert callable(expire_jobs) assert settings.redis_settings == uws_config.arq_redis_settings assert not settings.allow_abort_jobs + assert settings.job_completion_wait == UWS_DATABASE_TIMEOUT assert settings.queue_name == UWS_QUEUE_NAME assert settings.on_startup assert settings.on_shutdown