From 1cc483b353db0bed2e68d661bf8960035ddd2603 Mon Sep 17 00:00:00 2001 From: saurbhc Date: Mon, 20 Nov 2023 15:36:57 +0000 Subject: [PATCH] [PY-398] Polling on Item Push (#708) * update Meta Stage obj - add an optional wait arg to wait for item processing_status, update Meta Workflow obj - fix dataset stage * update test_stagemeta.py - add test for `wait=True` * update stage.py - ran `black` * update workflow meta object - raising `MissingDataset` when dataset stage not found * update workflow meta object - fix ruff f-string * update Stage meta object - implement retries when checking all-items-complete * rewrite `check_all_items_complete` logic in an attempt to lower len(item_ids) APIs calls + minor improvements * minor fix for a typo * update stage meta class - wait logic fix --- darwin/future/core/items/__init__.py | 2 +- darwin/future/exceptions.py | 6 ++ darwin/future/meta/objects/stage.py | 58 ++++++++++++++++++- darwin/future/meta/objects/workflow.py | 30 ++++++++-- .../tests/meta/objects/test_stagemeta.py | 53 ++++++++++++++++- 5 files changed, 140 insertions(+), 9 deletions(-) diff --git a/darwin/future/core/items/__init__.py b/darwin/future/core/items/__init__.py index a56cb784d..8d0bf339d 100644 --- a/darwin/future/core/items/__init__.py +++ b/darwin/future/core/items/__init__.py @@ -1,2 +1,2 @@ -from darwin.future.core.items.get import get_item_ids, get_item_ids_stage +from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage from darwin.future.core.items.move_items import move_items_to_stage diff --git a/darwin/future/exceptions.py b/darwin/future/exceptions.py index 39da4b186..4bb22f744 100644 --- a/darwin/future/exceptions.py +++ b/darwin/future/exceptions.py @@ -140,3 +140,9 @@ class DatasetNotFound(DarwinException): """Raised when the dataset endpoint returns a malformed response.""" ... + + +class MaxRetriesError(DarwinException): + """Raised when a certain API call is re-tried for {x} number of times.""" + + ... diff --git a/darwin/future/meta/objects/stage.py b/darwin/future/meta/objects/stage.py index 068d9de2e..10e481079 100644 --- a/darwin/future/meta/objects/stage.py +++ b/darwin/future/meta/objects/stage.py @@ -1,11 +1,13 @@ from __future__ import annotations +import time from typing import List from uuid import UUID -from darwin.future.core.items import move_items_to_stage +from darwin.future.core.items import get_item, move_items_to_stage from darwin.future.core.types.query import QueryFilter from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore +from darwin.future.exceptions import MaxRetriesError from darwin.future.meta.objects.base import MetaBase from darwin.future.meta.queries.item import ItemQuery from darwin.future.meta.queries.item_id import ItemIDQuery @@ -76,7 +78,50 @@ def item_ids(self) -> ItemIDQuery: ], ) - def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage: + def check_all_items_complete( + self, + slug: str, + item_ids: list[str], + wait_max_attempts: int = 5, + wait_time: float = 0.5, + ) -> bool: + """ + Checks if all items are complete. If not, waits and tries again. Raises error if max attempts reached. + + Args: + slug (str): Team slug + item_ids (list[str]): List of item ids + max_attempts (int, optional): Max number of attempts. Defaults to 5. + wait_time (float, optional): Wait time between attempts. Defaults to 0.5. + """ + for attempt in range(1, wait_max_attempts + 1): + # check if all items are complete + for item_id in item_ids[:]: + if get_item(self.client, slug, item_id).processing_status != "complete": + break + item_ids.remove(item_id) + else: + # if all items are complete, return. + return True + # if not complete, wait + time.sleep(wait_time * attempt) + else: + # if max attempts reached, raise error + raise MaxRetriesError( + f"Max attempts reached. {len(item_ids)} items pending completion check." + ) + + def move_attached_files_to_stage( + self, + new_stage_id: UUID, + wait: bool = True, + wait_max_attempts: int = 5, + wait_time: float = 0.5, + ) -> Stage: + """ + Args: + wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True. + """ assert self.meta_params["team_slug"] is not None and isinstance( self.meta_params["team_slug"], str ) @@ -92,6 +137,15 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage: self.meta_params["dataset_id"], ) ids = [str(x.id) for x in self.item_ids.collect_all()] + + if wait: + self.check_all_items_complete( + slug=team_slug, + item_ids=ids, + wait_max_attempts=wait_max_attempts, + wait_time=wait_time, + ) + move_items_to_stage( self.client, team_slug, diff --git a/darwin/future/meta/objects/workflow.py b/darwin/future/meta/objects/workflow.py index e7b857483..52d2e9f03 100644 --- a/darwin/future/meta/objects/workflow.py +++ b/darwin/future/meta/objects/workflow.py @@ -8,9 +8,10 @@ from darwin.datatypes import PathLike from darwin.future.core.types.query import QueryFilter from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore +from darwin.future.exceptions import MissingDataset from darwin.future.meta.objects.base import MetaBase from darwin.future.meta.queries.item import ItemQuery -from darwin.future.meta.queries.stage import StageQuery +from darwin.future.meta.queries.stage import Stage, StageQuery class Workflow(MetaBase[WorkflowCore]): @@ -65,6 +66,14 @@ def stages(self) -> StageQuery: meta_params["dataset_name"] = self.datasets[0].name return StageQuery(self.client, meta_params=meta_params) + def _get_dataset_stage(self) -> Stage: + # stages are not in right order - finding the dataset stage + for stage in self.stages: + if stage.type == "dataset": + return stage + + raise MissingDataset("Workflow has no dataset stage") + @property def datasets(self) -> List[WFDatasetCore]: if self._element.dataset is None: @@ -79,15 +88,21 @@ def id(self) -> UUID: def name(self) -> str: return self._element.name - def push_from_dataset_stage(self) -> Workflow: + def push_from_dataset_stage( + self, wait: bool = True, wait_max_attempts: int = 5, wait_time: float = 0.5 + ) -> Workflow: assert self._element.dataset is not None stages = self.stages - ds_stage = stages[0] assert len(stages) > 1 + + ds_stage = self._get_dataset_stage() assert ds_stage._element.type == WFTypeCore.DATASET next_stage = ds_stage._element.edges[0].target_stage_id assert next_stage is not None - ds_stage.move_attached_files_to_stage(next_stage) + ds_stage.move_attached_files_to_stage( + next_stage, wait, wait_max_attempts, wait_time + ) + return self def upload_files( @@ -101,6 +116,9 @@ def upload_files( preserve_folders: bool = False, verbose: bool = False, auto_push: bool = True, + wait: bool = True, + wait_max_attempts: int = 5, + wait_time: float = 0.5, ) -> Workflow: assert self._element.dataset is not None upload_data( @@ -115,7 +133,9 @@ def upload_files( verbose, ) if auto_push: - self.push_from_dataset_stage() + self.push_from_dataset_stage( + wait=wait, wait_max_attempts=wait_max_attempts, wait_time=wait_time + ) return self def __str__(self) -> str: diff --git a/darwin/future/tests/meta/objects/test_stagemeta.py b/darwin/future/tests/meta/objects/test_stagemeta.py index 68c025d34..502a9b3cb 100644 --- a/darwin/future/tests/meta/objects/test_stagemeta.py +++ b/darwin/future/tests/meta/objects/test_stagemeta.py @@ -98,7 +98,58 @@ def test_move_attached_files_to_stage( json={"success": UUIDs_str}, status=200, ) - stage_meta.move_attached_files_to_stage(stage_meta.id) + stage_meta.move_attached_files_to_stage(stage_meta.id, wait=False) + + +def test_move_attached_files_to_stage_wait( + base_meta_client: Client, stage_meta: Stage, UUIDs_str: List[str], UUIDs: List[UUID] +) -> None: + with responses.RequestsMock() as rsps: + rsps.add( + rsps.GET, + base_meta_client.config.api_endpoint + + "v2/teams/default-team/items/list_ids", + json={"item_ids": UUIDs_str}, + match=[ + query_param_matcher( + { + "page[offset]": "0", + "page[size]": "500", + "workflow_stage_ids": str(stage_meta.id), + "dataset_ids": "1337", + } + ) + ], + status=200, + ) + rsps.add( + rsps.POST, + base_meta_client.config.api_endpoint + "v2/teams/default-team/items/stage", + json={"success": UUIDs_str}, + status=200, + ) + for uuid in stage_meta.item_ids.collect_all(): + rsps.add( + rsps.GET, + base_meta_client.config.api_endpoint + + f"v2/teams/default-team/items/{uuid}", + json={ + "archived": False, + "dataset_id": 1337, + "id": "00000000-0000-0000-0000-000000000000", + "layout": None, + "name": "test_0", + "path": "test_path", + "priority": 0, + "processing_status": "complete", + "slots": [], + "tags": [], + }, + status=200, + ) + stage_meta.move_attached_files_to_stage( + stage_meta.id, wait=True, wait_max_attempts=5, wait_time=0.5 + ) def test_get_stage_id(stage_meta: Stage) -> None: