diff --git a/docs/source/cli/index.rst b/docs/source/cli/index.rst index 4cb7b03d85..56079e2728 100644 --- a/docs/source/cli/index.rst +++ b/docs/source/cli/index.rst @@ -989,9 +989,9 @@ List delegated operations. only list operations for this dataset -s STATE, --state STATE only list operations with this state. Supported - values are ('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED') + values are ('SCHEDULED', 'QUEUED', 'RUNNING', 'COMPLETED', 'FAILED') --sort-by SORT_BY how to sort the operations. Supported values are - ('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR') + ('SCHEDULED_AT', 'QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR') --reverse whether to sort in reverse order -l LIMIT, --limit LIMIT a maximum number of operations to show diff --git a/fiftyone/core/cli.py b/fiftyone/core/cli.py index 989fd7c1f7..85912a875c 100644 --- a/fiftyone/core/cli.py +++ b/fiftyone/core/cli.py @@ -3150,7 +3150,7 @@ def setup(parser): default=None, help=( "only list operations with this state. Supported values are " - "('QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')" + "('SCHEDULED', 'QUEUED', 'RUNNING', 'COMPLETED', 'FAILED')" ), ) parser.add_argument( @@ -3158,7 +3158,7 @@ def setup(parser): default="QUEUED_AT", help=( "how to sort the operations. Supported values are " - "('QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')" + "('SCHEDULED_AT', 'QUEUED_AT', 'STARTED_AT', COMPLETED_AT', 'FAILED_AT', 'OPERATOR')" ), ) parser.add_argument( @@ -3390,7 +3390,7 @@ def setup(parser): default=None, help=( "delete operations in this state. Supported values are " - "('QUEUED', 'COMPLETED', 'FAILED')" + "('SCHEDULED', 'QUEUED', 'COMPLETED', 'FAILED')" ), ) parser.add_argument( diff --git a/fiftyone/factory/__init__.py b/fiftyone/factory/__init__.py index 0731ede3a6..dcafe4548d 100644 --- a/fiftyone/factory/__init__.py +++ b/fiftyone/factory/__init__.py @@ -11,6 +11,7 @@ class SortByField(object): """Sort by enum for delegated operations.""" UPDATED_AT = "updated_at" + SCHEDULED_AT = "scheduled_at" QUEUED_AT = "queued_at" COMPLETED_AT = "completed_at" STARTED_AT = "started_at" diff --git a/fiftyone/factory/repos/delegated_operation.py b/fiftyone/factory/repos/delegated_operation.py index 854cab16f1..4f1ca16bb1 100644 --- a/fiftyone/factory/repos/delegated_operation.py +++ b/fiftyone/factory/repos/delegated_operation.py @@ -64,6 +64,22 @@ def get_queued_operations( "subclass must implement get_queued_operations()" ) + def get_scheduled_operations( + self, operator: str = None, dataset_name=None + ) -> List[DelegatedOperationDocument]: + """Get all scheduled operations.""" + raise NotImplementedError( + "subclass must implement get_scheduled_operations()" + ) + + def get_running_operations( + self, operator: str = None, dataset_name=None + ) -> List[DelegatedOperationDocument]: + """Get all running operations.""" + raise NotImplementedError( + "subclass must implement get_running_operations()" + ) + def list_operations( self, operator: str = None, @@ -275,6 +291,14 @@ def update_run_state( "updated_at": datetime.utcnow(), } } + elif run_state == ExecutionRunState.SCHEDULED: + update = { + "$set": { + "run_state": run_state, + "scheduled_at": datetime.utcnow(), + "updated_at": datetime.utcnow(), + } + } if run_link is not None: update["$set"]["run_link"] = run_link @@ -341,6 +365,28 @@ def get_queued_operations( run_state=ExecutionRunState.QUEUED, ) + def get_scheduled_operations( + self, + operator: str = None, + dataset_name: ObjectId = None, + ) -> List[DelegatedOperationDocument]: + return self.list_operations( + operator=operator, + dataset_name=dataset_name, + run_state=ExecutionRunState.SCHEDULED, + ) + + def get_running_operations( + self, + operator: str = None, + dataset_name: ObjectId = None, + ) -> List[DelegatedOperationDocument]: + return self.list_operations( + operator=operator, + dataset_name=dataset_name, + run_state=ExecutionRunState.RUNNING, + ) + def list_operations( self, operator: str = None, diff --git a/fiftyone/factory/repos/delegated_operation_doc.py b/fiftyone/factory/repos/delegated_operation_doc.py index 686a8bdf34..150b8a05de 100644 --- a/fiftyone/factory/repos/delegated_operation_doc.py +++ b/fiftyone/factory/repos/delegated_operation_doc.py @@ -46,6 +46,7 @@ def __init__( self.pinned = False self.completed_at = None self.failed_at = None + self.scheduled_at = None self.result = None self.id = None self._doc = None @@ -53,25 +54,28 @@ def __init__( def from_pymongo(self, doc: dict): # required fields - self.operator = doc["operator"] - self.queued_at = doc["queued_at"] - self.run_state = doc["run_state"] - self.label = doc["label"] if "label" in doc else None - self.updated_at = doc["updated_at"] if "updated_at" in doc else None + self.operator = doc.get("operator") + self.queued_at = doc.get("queued_at") + self.run_state = doc.get("run_state") # optional fields - self.delegation_target = ( - doc["delegation_target"] if "delegation_target" in doc else None - ) - self.started_at = doc["started_at"] if "started_at" in doc else None - self.completed_at = ( - doc["completed_at"] if "completed_at" in doc else None - ) - self.failed_at = doc["failed_at"] if "failed_at" in doc else None - self.pinned = doc["pinned"] if "pinned" in doc else None - self.dataset_id = doc["dataset_id"] if "dataset_id" in doc else None - self.run_link = doc["run_link"] if "run_link" in doc else None + self.delegation_target = doc.get("delegation_target", None) + self.started_at = doc.get("started_at", None) + self.completed_at = doc.get("completed_at", None) + self.failed_at = doc.get("failed_at", None) + self.scheduled_at = doc.get("scheduled_at", None) + self.pinned = doc.get("pinned", None) + self.dataset_id = doc.get("dataset_id", None) + self.run_link = doc.get("run_link", None) + self.metadata = doc.get("metadata", None) + self.label = doc.get("label", None) + self.updated_at = doc.get("updated_at", None) + + # internal fields + self.id = doc["_id"] + self._doc = doc + # nested fields if ( "context" in doc and doc["context"] is not None @@ -100,12 +104,6 @@ def from_pymongo(self, doc: dict): if "updated_at" in doc["status"]: self.status.updated_at = doc["status"]["updated_at"] - # internal fields - self.id = doc["_id"] - self._doc = doc - - self.metadata = doc["metadata"] if "metadata" in doc else None - return self def to_pymongo(self) -> dict: diff --git a/fiftyone/operators/delegated.py b/fiftyone/operators/delegated.py index ff43dd597a..afa2441f6f 100644 --- a/fiftyone/operators/delegated.py +++ b/fiftyone/operators/delegated.py @@ -101,6 +101,17 @@ def set_running(self, doc_id, progress=None, run_link=None): progress=progress, ) + def set_scheduled(self, doc_id): + """Sets the given delegated operation to scheduled state. + Args: + doc_id: the ID of the delegated operation + Returns: + a :class:`fiftyone.factory.repos.DelegatedOperationDocument` + """ + return self._repo.update_run_state( + _id=doc_id, run_state=ExecutionRunState.SCHEDULED + ) + def set_completed( self, doc_id, @@ -235,6 +246,34 @@ def get_queued_operations(self, operator=None, dataset_name=None): operator=operator, dataset_name=dataset_name ) + def get_scheduled_operations(self, operator=None, dataset_name=None): + """Returns all scheduled delegated operations. + Args: + operator (None): the optional name of the operator to return all + the scheduled delegated operations for + dataset_name (None): the optional name of the dataset to return all + the scheduled delegated operations for + Returns: + a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument` + """ + return self._repo.get_scheduled_operations( + operator=operator, dataset_name=dataset_name + ) + + def get_running_operations(self, operator=None, dataset_name=None): + """Returns all running delegated operations. + Args: + operator (None): the optional name of the operator to return all + the running delegated operations for + dataset_name (None): the optional name of the dataset to return all + the running delegated operations for + Returns: + a list of :class:`fiftyone.factory.repos.DelegatedOperationDocument` + """ + return self._repo.get_running_operations( + operator=operator, dataset_name=dataset_name + ) + def get(self, doc_id): """Returns the delegated operation with the given ID. diff --git a/fiftyone/operators/executor.py b/fiftyone/operators/executor.py index 6a4b596c99..151343cd42 100644 --- a/fiftyone/operators/executor.py +++ b/fiftyone/operators/executor.py @@ -36,6 +36,7 @@ class ExecutionRunState(object): """Enumeration of the available operator run states.""" + SCHEDULED = "scheduled" QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" diff --git a/tests/unittests/delegated_operators_tests.py b/tests/unittests/delegated_operators_tests.py index a64d177714..e787b3280b 100644 --- a/tests/unittests/delegated_operators_tests.py +++ b/tests/unittests/delegated_operators_tests.py @@ -226,9 +226,7 @@ def test_delegate_operation( self.assertIsNotNone(doc2.metadata) self.assertEqual(doc2.metadata, doc2_metadata) - def test_list_queued_operations( - self, mock_get_operator, mock_operator_exists - ): + def test_list_operations(self, mock_get_operator, mock_operator_exists): dataset_name = f"test_dataset_{ObjectId()}" dataset = Dataset(dataset_name, _create=True, persistent=True) dataset.save() @@ -248,9 +246,8 @@ def test_list_queued_operations( # get all the existing counts of queued operations initial_queued = len(self.svc.get_queued_operations()) - initial_running = len( - self.svc.list_operations(run_state=ExecutionRunState.RUNNING) - ) + initial_running = len(self.svc.get_running_operations()) + initial_scheduled = len(self.svc.get_scheduled_operations()) initial_dataset_queued = len( self.svc.get_queued_operations(dataset_name=dataset_name) ) @@ -306,9 +303,18 @@ def test_list_queued_operations( queued = self.svc.get_queued_operations() self.assertEqual(len(queued), 10 + initial_queued) - running = self.svc.list_operations(run_state=ExecutionRunState.RUNNING) + running = self.svc.get_running_operations() self.assertEqual(len(running), 10 + initial_running) + for doc in docs_to_run: + self.svc.set_scheduled(doc) + + queued = self.svc.get_queued_operations() + self.assertEqual(len(queued), 10 + initial_queued) + + scheduled = self.svc.get_scheduled_operations() + self.assertEqual(len(scheduled), 10 + initial_scheduled) + dataset.delete() dataset2.delete()