Skip to content

Commit

Permalink
Merge pull request #4893 from voxel51/feat/atomic-state-transitions
Browse files Browse the repository at this point in the history
add support for atomic state transitions
  • Loading branch information
tom-vx51 authored Oct 10, 2024
2 parents 2e1c503 + fc4d6d4 commit 6f5a12c
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 30 deletions.
22 changes: 20 additions & 2 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def update_run_state(
result: ExecutionResult = None,
run_link: str = None,
progress: ExecutionProgress = None,
required_state: ExecutionRunState = None,
) -> DelegatedOperationDocument:
"""Update the run state of an operation."""
raise NotImplementedError("subclass must implement update_run_state()")
Expand Down Expand Up @@ -243,6 +244,7 @@ def update_run_state(
result: ExecutionResult = None,
run_link: str = None,
progress: ExecutionProgress = None,
required_state: ExecutionRunState = None,
) -> DelegatedOperationDocument:
update = None

Expand Down Expand Up @@ -299,6 +301,14 @@ def update_run_state(
"updated_at": datetime.utcnow(),
}
}
elif run_state == ExecutionRunState.QUEUED:
update = {
"$set": {
"run_state": run_state,
"queued_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
}

if run_link is not None:
update["$set"]["run_link"] = run_link
Expand All @@ -310,13 +320,21 @@ def update_run_state(
update["$set"]["status"] = progress
update["$set"]["status"]["updated_at"] = datetime.utcnow()

collection_filter = {"_id": _id}
if required_state is not None:
collection_filter["run_state"] = required_state

doc = self._collection.find_one_and_update(
filter={"_id": _id},
filter=collection_filter,
update=[update],
return_document=pymongo.ReturnDocument.AFTER,
)

return DelegatedOperationDocument().from_pymongo(doc)
return (
DelegatedOperationDocument().from_pymongo(doc)
if doc is not None
else None
)

def update_progress(
self,
Expand Down
87 changes: 79 additions & 8 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ def set_progress(self, doc_id, progress):
"""
return self._repo.update_progress(_id=doc_id, progress=progress)

def set_running(self, doc_id, progress=None, run_link=None):
def set_running(
self,
doc_id,
progress=None,
run_link=None,
required_state=None,
):
"""Sets the given delegated operation to running state.
Args:
Expand All @@ -90,26 +96,60 @@ def set_running(self, doc_id, progress=None, run_link=None):
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
a :class:`fiftyone.factory.repos.DelegatedOperationDocument` if the
update was performed, else ``None``.
"""
return self._repo.update_run_state(
_id=doc_id,
run_state=ExecutionRunState.RUNNING,
run_link=run_link,
progress=progress,
required_state=required_state,
)

def set_scheduled(self, doc_id):
def set_scheduled(self, doc_id, required_state=None):
"""Sets the given delegated operation to scheduled state.
Args:
doc_id: the ID of the delegated operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
a :class:`fiftyone.factory.repos.DelegatedOperationDocument` if the
update was performed, else ``None``.
"""
return self._repo.update_run_state(
_id=doc_id, run_state=ExecutionRunState.SCHEDULED
_id=doc_id,
run_state=ExecutionRunState.SCHEDULED,
required_state=required_state,
)

def set_queued(self, doc_id, required_state=None):
"""Sets the given delegated operation to queued state.
Args:
doc_id: the ID of the delegated operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument` if the
update was performed, else ``None``.
"""
return self._repo.update_run_state(
_id=doc_id,
run_state=ExecutionRunState.QUEUED,
required_state=required_state,
)

def set_completed(
Expand All @@ -118,6 +158,7 @@ def set_completed(
result=None,
progress=None,
run_link=None,
required_state=None,
):
"""Sets the given delegated operation to completed state.
Expand All @@ -131,9 +172,14 @@ def set_completed(
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
a :class:`fiftyone.factory.repos.DelegatedOperationDocument` if the
update was performed, else ``None``.
"""

return self._repo.update_run_state(
Expand All @@ -142,6 +188,7 @@ def set_completed(
result=result,
progress=progress,
run_link=run_link,
required_state=required_state,
)

def set_failed(
Expand All @@ -150,6 +197,7 @@ def set_failed(
result=None,
progress=None,
run_link=None,
required_state=None,
):
"""Sets the given delegated operation to failed state.
Expand All @@ -163,16 +211,22 @@ def set_failed(
operation
run_link (None): an optional link to orchestrator-specific
information about the operation
required_state (None): an optional
:class:`fiftyone.operators.executor.ExecutionRunState` required
state of the operation. If provided, the update will only be
performed if the referenced operation matches this state.
Returns:
a :class:`fiftyone.factory.repos.DelegatedOperationDocument`
a :class:`fiftyone.factory.repos.DelegatedOperationDocument` if the
update was performed, else ``None``.
"""
return self._repo.update_run_state(
_id=doc_id,
run_state=ExecutionRunState.FAILED,
result=result,
run_link=run_link,
progress=progress,
required_state=required_state,
)

def set_pinned(self, doc_id, pinned=True):
Expand Down Expand Up @@ -391,7 +445,24 @@ def execute_operation(self, operation, log=False, run_link=None):
information about the operation
"""
try:
self.set_running(doc_id=operation.id, run_link=run_link)
succeeded = (
self.set_running(
doc_id=operation.id,
run_link=run_link,
required_state=ExecutionRunState.QUEUED,
)
is not None
)
if not succeeded:
if log:
logger.debug(
"Not executing operation %s (%s) which is "
"no longer in QUEUED state, or doesn't exist.",
operation.id,
operation.operator,
)
return

if log:
logger.info(
"\nRunning operation %s (%s)",
Expand Down
Loading

0 comments on commit 6f5a12c

Please sign in to comment.