Skip to content

Commit

Permalink
Apply suggestions from review
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 committed May 15, 2024
1 parent dc348c4 commit 4bf2bb5
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,35 +532,47 @@ def get_all_tasks(
self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources)
)

def iter_task_tree(
def _iter_legacy_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid
Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing
before upgrade)
:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects
Used internally by iter_task_tree.
"""
# Process unrouted tasks produced by Karton <5.4.0
legacy_task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size
)
for chunk in chunks_iter(legacy_task_keys, chunk_size):
yield from filter(
lambda task: task.root_uid == root_uid,
(
Task.unserialize(task_data, backend=self)
if parse_resources
else Task.unserialize(task_data, parse_resources=False)
Task.unserialize(task_data, parse_resources=parse_resources)
for task_data in self.redis.mget(chunk)
if task_data is not None
),
)

def iter_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid
:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects
"""
# Process <5.4.0 tasks (unrouted from <5.4.0 producers
# or existing before upgrade)
yield from self._iter_legacy_task_tree(
root_uid, chunk_size=chunk_size, parse_resources=parse_resources
)
# Process >=5.4.0 tasks
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size
Expand Down

0 comments on commit 4bf2bb5

Please sign in to comment.