From 3ebce4d890283c13e932037ad7750be58bad64e6 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 2 Feb 2023 11:38:53 -0600 Subject: [PATCH 1/4] Adds default value for priority to prevent new agents from breaking with old servers --- src/prefect/orion/models/work_queues.py | 2 +- src/prefect/orion/schemas/core.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/orion/models/work_queues.py b/src/prefect/orion/models/work_queues.py index f59e87287aba..dccefbfc2db5 100644 --- a/src/prefect/orion/models/work_queues.py +++ b/src/prefect/orion/models/work_queues.py @@ -62,7 +62,7 @@ async def create_work_queue( ).where(db.WorkQueue.work_pool_id == data["work_pool_id"]) priority = (await session.execute(max_priority_query)).scalar() - model = db.WorkQueue(**data, priority=priority) + model = db.WorkQueue(**data, priority=priority + 1) else: model = db.WorkQueue(**data, priority=1) session.add(model) diff --git a/src/prefect/orion/schemas/core.py b/src/prefect/orion/schemas/core.py index fccbce119e83..495005f5a2b9 100644 --- a/src/prefect/orion/schemas/core.py +++ b/src/prefect/orion/schemas/core.py @@ -836,7 +836,7 @@ class WorkQueue(ORMBaseModel): default=None, description="An optional concurrency limit for the work queue." ) priority: conint(ge=1) = Field( - ..., + default=1, description="The queue's priority. Lower values are higher priority (1 is the highest).", ) # Will be required after a future migration From 97aee417c14340ac3a585e08649a390440ce808b Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 2 Feb 2023 12:44:45 -0600 Subject: [PATCH 2/4] Updates default priority value for WorkQueueCreate --- src/prefect/orion/schemas/actions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/prefect/orion/schemas/actions.py b/src/prefect/orion/schemas/actions.py index c70c1c949fdb..6cf6d65d30c0 100644 --- a/src/prefect/orion/schemas/actions.py +++ b/src/prefect/orion/schemas/actions.py @@ -479,7 +479,10 @@ class WorkQueueCreate(ActionBaseModel): description: Optional[str] = FieldFrom(schemas.core.WorkQueue) is_paused: bool = FieldFrom(schemas.core.WorkQueue) concurrency_limit: Optional[int] = FieldFrom(schemas.core.WorkQueue) - priority: Optional[int] = FieldFrom(schemas.core.WorkQueue) + priority: Optional[int] = Field( + default=None, + description="The queue's priority. Lower values are higher priority (1 is the highest).", + ) # DEPRECATED From 89f844b827865ad17837a57a0008da606d2f3044 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 2 Feb 2023 13:23:39 -0600 Subject: [PATCH 3/4] Add comments and clean up logic --- src/prefect/orion/models/work_queues.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/prefect/orion/models/work_queues.py b/src/prefect/orion/models/work_queues.py index dccefbfc2db5..b260b34b9634 100644 --- a/src/prefect/orion/models/work_queues.py +++ b/src/prefect/orion/models/work_queues.py @@ -42,6 +42,7 @@ async def create_work_queue( data = work_queue.dict(exclude={"priority"}) if data.get("work_pool_id") is None: + # If no work pool is provided, get or create the default agent work pool default_agent_work_pool = await models.workers.read_work_pool_by_name( session=session, work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME ) @@ -57,6 +58,8 @@ async def create_work_queue( data["work_pool_id"] = default_agent_work_pool.id if data.get("work_pool_id"): + # Set the priority to be the max priority + 1 + # This will make the new queue the lowest priority max_priority_query = sa.select( sa.func.coalesce(sa.func.max(db.WorkQueue.priority), 0) ).where(db.WorkQueue.work_pool_id == data["work_pool_id"]) @@ -64,6 +67,7 @@ async def create_work_queue( model = db.WorkQueue(**data, priority=priority + 1) else: + # Legacy case to handle if there is no work pool model = db.WorkQueue(**data, priority=1) session.add(model) await session.flush() From 1424888ea26db2baaf9c2152d115284126cf8792 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 2 Feb 2023 13:30:22 -0600 Subject: [PATCH 4/4] Removes silly if statement --- src/prefect/orion/models/work_queues.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/prefect/orion/models/work_queues.py b/src/prefect/orion/models/work_queues.py index b260b34b9634..0a4af5603e33 100644 --- a/src/prefect/orion/models/work_queues.py +++ b/src/prefect/orion/models/work_queues.py @@ -57,18 +57,15 @@ async def create_work_queue( ) data["work_pool_id"] = default_agent_work_pool.id - if data.get("work_pool_id"): - # Set the priority to be the max priority + 1 - # This will make the new queue the lowest priority - max_priority_query = sa.select( - sa.func.coalesce(sa.func.max(db.WorkQueue.priority), 0) - ).where(db.WorkQueue.work_pool_id == data["work_pool_id"]) - priority = (await session.execute(max_priority_query)).scalar() - - model = db.WorkQueue(**data, priority=priority + 1) - else: - # Legacy case to handle if there is no work pool - model = db.WorkQueue(**data, priority=1) + # Set the priority to be the max priority + 1 + # This will make the new queue the lowest priority + max_priority_query = sa.select( + sa.func.coalesce(sa.func.max(db.WorkQueue.priority), 0) + ).where(db.WorkQueue.work_pool_id == data["work_pool_id"]) + priority = (await session.execute(max_priority_query)).scalar() + + model = db.WorkQueue(**data, priority=priority + 1) + session.add(model) await session.flush()