diff --git a/karton/core/backend.py b/karton/core/backend.py index 2c1b33f..d678898 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -532,21 +532,15 @@ def get_all_tasks( self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources) ) - def iter_task_tree( + def _iter_legacy_task_tree( self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True ) -> Iterator[Task]: """ - Iterates all tasks that belong to the same analysis task tree - and have the same root_uid + Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing + before upgrade) - :param root_uid: Root identifier of task tree - :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command - :param parse_resources: If set to False, resources are not parsed. - It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation - to learn more. - :return: Iterator with task objects + Used internally by iter_task_tree. """ - # Process unrouted tasks produced by Karton <5.4.0 legacy_task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size ) @@ -554,13 +548,31 @@ def iter_task_tree( yield from filter( lambda task: task.root_uid == root_uid, ( - Task.unserialize(task_data, backend=self) - if parse_resources - else Task.unserialize(task_data, parse_resources=False) + Task.unserialize(task_data, parse_resources=parse_resources) for task_data in self.redis.mget(chunk) if task_data is not None ), ) + + def iter_task_tree( + self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True + ) -> Iterator[Task]: + """ + Iterates all tasks that belong to the same analysis task tree + and have the same root_uid + + :param root_uid: Root identifier of task tree + :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. + :return: Iterator with task objects + """ + # Process <5.4.0 tasks (unrouted from <5.4.0 producers + # or existing before upgrade) + yield from self._iter_legacy_task_tree( + root_uid, chunk_size=chunk_size, parse_resources=parse_resources + ) # Process >=5.4.0 tasks task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size