Skip to content

Commit

Permalink
[PY-398] Polling on Item Push (#708)
Browse files Browse the repository at this point in the history
* update Meta Stage obj - add an optional wait arg to wait for item processing_status, update Meta Workflow obj - fix dataset stage

* update test_stagemeta.py - add test for `wait=True`

* update stage.py - ran `black`

* update workflow meta object - raising `MissingDataset` when dataset stage not found

* update workflow meta object - fix ruff f-string

* update Stage meta object - implement retries when checking all-items-complete

* rewrite `check_all_items_complete` logic in an attempt to lower len(item_ids) APIs calls + minor improvements

* minor fix for a typo

* update stage meta class - wait logic fix
  • Loading branch information
saurbhc authored Nov 20, 2023
1 parent 625fd1f commit 1cc483b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 9 deletions.
2 changes: 1 addition & 1 deletion darwin/future/core/items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from darwin.future.core.items.get import get_item_ids, get_item_ids_stage
from darwin.future.core.items.get import get_item, get_item_ids, get_item_ids_stage
from darwin.future.core.items.move_items import move_items_to_stage
6 changes: 6 additions & 0 deletions darwin/future/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,9 @@ class DatasetNotFound(DarwinException):
"""Raised when the dataset endpoint returns a malformed response."""

...


class MaxRetriesError(DarwinException):
"""Raised when a certain API call is re-tried for {x} number of times."""

...
58 changes: 56 additions & 2 deletions darwin/future/meta/objects/stage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import time
from typing import List
from uuid import UUID

from darwin.future.core.items import move_items_to_stage
from darwin.future.core.items import get_item, move_items_to_stage
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFEdgeCore, WFStageCore
from darwin.future.exceptions import MaxRetriesError
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.item import ItemQuery
from darwin.future.meta.queries.item_id import ItemIDQuery
Expand Down Expand Up @@ -76,7 +78,50 @@ def item_ids(self) -> ItemIDQuery:
],
)

def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
def check_all_items_complete(
self,
slug: str,
item_ids: list[str],
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> bool:
"""
Checks if all items are complete. If not, waits and tries again. Raises error if max attempts reached.
Args:
slug (str): Team slug
item_ids (list[str]): List of item ids
max_attempts (int, optional): Max number of attempts. Defaults to 5.
wait_time (float, optional): Wait time between attempts. Defaults to 0.5.
"""
for attempt in range(1, wait_max_attempts + 1):
# check if all items are complete
for item_id in item_ids[:]:
if get_item(self.client, slug, item_id).processing_status != "complete":
break
item_ids.remove(item_id)
else:
# if all items are complete, return.
return True
# if not complete, wait
time.sleep(wait_time * attempt)
else:
# if max attempts reached, raise error
raise MaxRetriesError(
f"Max attempts reached. {len(item_ids)} items pending completion check."
)

def move_attached_files_to_stage(
self,
new_stage_id: UUID,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Stage:
"""
Args:
wait (bool, optional): Waits for Item 'processing_status' to complete. Defaults to True.
"""
assert self.meta_params["team_slug"] is not None and isinstance(
self.meta_params["team_slug"], str
)
Expand All @@ -92,6 +137,15 @@ def move_attached_files_to_stage(self, new_stage_id: UUID) -> Stage:
self.meta_params["dataset_id"],
)
ids = [str(x.id) for x in self.item_ids.collect_all()]

if wait:
self.check_all_items_complete(
slug=team_slug,
item_ids=ids,
wait_max_attempts=wait_max_attempts,
wait_time=wait_time,
)

move_items_to_stage(
self.client,
team_slug,
Expand Down
30 changes: 25 additions & 5 deletions darwin/future/meta/objects/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from darwin.datatypes import PathLike
from darwin.future.core.types.query import QueryFilter
from darwin.future.data_objects.workflow import WFDatasetCore, WFTypeCore, WorkflowCore
from darwin.future.exceptions import MissingDataset
from darwin.future.meta.objects.base import MetaBase
from darwin.future.meta.queries.item import ItemQuery
from darwin.future.meta.queries.stage import StageQuery
from darwin.future.meta.queries.stage import Stage, StageQuery


class Workflow(MetaBase[WorkflowCore]):
Expand Down Expand Up @@ -65,6 +66,14 @@ def stages(self) -> StageQuery:
meta_params["dataset_name"] = self.datasets[0].name
return StageQuery(self.client, meta_params=meta_params)

def _get_dataset_stage(self) -> Stage:
# stages are not in right order - finding the dataset stage
for stage in self.stages:
if stage.type == "dataset":
return stage

raise MissingDataset("Workflow has no dataset stage")

@property
def datasets(self) -> List[WFDatasetCore]:
if self._element.dataset is None:
Expand All @@ -79,15 +88,21 @@ def id(self) -> UUID:
def name(self) -> str:
return self._element.name

def push_from_dataset_stage(self) -> Workflow:
def push_from_dataset_stage(
self, wait: bool = True, wait_max_attempts: int = 5, wait_time: float = 0.5
) -> Workflow:
assert self._element.dataset is not None
stages = self.stages
ds_stage = stages[0]
assert len(stages) > 1

ds_stage = self._get_dataset_stage()
assert ds_stage._element.type == WFTypeCore.DATASET
next_stage = ds_stage._element.edges[0].target_stage_id
assert next_stage is not None
ds_stage.move_attached_files_to_stage(next_stage)
ds_stage.move_attached_files_to_stage(
next_stage, wait, wait_max_attempts, wait_time
)

return self

def upload_files(
Expand All @@ -101,6 +116,9 @@ def upload_files(
preserve_folders: bool = False,
verbose: bool = False,
auto_push: bool = True,
wait: bool = True,
wait_max_attempts: int = 5,
wait_time: float = 0.5,
) -> Workflow:
assert self._element.dataset is not None
upload_data(
Expand All @@ -115,7 +133,9 @@ def upload_files(
verbose,
)
if auto_push:
self.push_from_dataset_stage()
self.push_from_dataset_stage(
wait=wait, wait_max_attempts=wait_max_attempts, wait_time=wait_time
)
return self

def __str__(self) -> str:
Expand Down
53 changes: 52 additions & 1 deletion darwin/future/tests/meta/objects/test_stagemeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,58 @@ def test_move_attached_files_to_stage(
json={"success": UUIDs_str},
status=200,
)
stage_meta.move_attached_files_to_stage(stage_meta.id)
stage_meta.move_attached_files_to_stage(stage_meta.id, wait=False)


def test_move_attached_files_to_stage_wait(
base_meta_client: Client, stage_meta: Stage, UUIDs_str: List[str], UUIDs: List[UUID]
) -> None:
with responses.RequestsMock() as rsps:
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ "v2/teams/default-team/items/list_ids",
json={"item_ids": UUIDs_str},
match=[
query_param_matcher(
{
"page[offset]": "0",
"page[size]": "500",
"workflow_stage_ids": str(stage_meta.id),
"dataset_ids": "1337",
}
)
],
status=200,
)
rsps.add(
rsps.POST,
base_meta_client.config.api_endpoint + "v2/teams/default-team/items/stage",
json={"success": UUIDs_str},
status=200,
)
for uuid in stage_meta.item_ids.collect_all():
rsps.add(
rsps.GET,
base_meta_client.config.api_endpoint
+ f"v2/teams/default-team/items/{uuid}",
json={
"archived": False,
"dataset_id": 1337,
"id": "00000000-0000-0000-0000-000000000000",
"layout": None,
"name": "test_0",
"path": "test_path",
"priority": 0,
"processing_status": "complete",
"slots": [],
"tags": [],
},
status=200,
)
stage_meta.move_attached_files_to_stage(
stage_meta.id, wait=True, wait_max_attempts=5, wait_time=0.5
)


def test_get_stage_id(stage_meta: Stage) -> None:
Expand Down

0 comments on commit 1cc483b

Please sign in to comment.