From 81547fa4bc2aaec65b953bcf676a389de145683a Mon Sep 17 00:00:00 2001 From: psrok1 Date: Mon, 13 May 2024 19:34:31 +0200 Subject: [PATCH 1/8] Task tree: backend performance improvements --- dev/logger.Dockerfile | 2 +- karton/core/__version__.py | 2 +- karton/core/backend.py | 28 ++++++++++++ karton/core/inspect.py | 94 ++++++++++++++++++++++++++------------ karton/core/task.py | 21 +++++++-- 5 files changed, 112 insertions(+), 35 deletions(-) diff --git a/dev/logger.Dockerfile b/dev/logger.Dockerfile index 58a5f87e..ba829e7b 100644 --- a/dev/logger.Dockerfile +++ b/dev/logger.Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7 +FROM python:3.11 COPY requirements.txt /karton/ COPY setup.py /karton/ diff --git a/karton/core/__version__.py b/karton/core/__version__.py index 4db55089..fc30498f 100644 --- a/karton/core/__version__.py +++ b/karton/core/__version__.py @@ -1 +1 @@ -__version__ = "5.3.4" +__version__ = "5.4.0" diff --git a/karton/core/backend.py b/karton/core/backend.py index bb06c8c0..c4c1562d 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -532,6 +532,34 @@ def get_all_tasks( self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources) ) + def iter_task_tree( + self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True + ) -> Iterator[Task]: + """ + Iterates all tasks that belong to the same analysis task tree + and have the same root_uid + + :param root_uid: Root identifier of task tree + :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. + :return: Iterator with task objects + + .. note:: + This method processes only these tasks that are stored under + karton.task:: key format which is fully-qualified + identifier introduced in Karton 5.4.0 + + Unrouted tasks produced by older Karton versions won't be returned. + """ + task_keys = self.redis.scan_iter( + match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size + ) + return self._iter_tasks( + task_keys, chunk_size=chunk_size, parse_resources=parse_resources + ) + def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None: """ Register or update task in Redis. diff --git a/karton/core/inspect.py b/karton/core/inspect.py index b3f22c02..ea83b80b 100644 --- a/karton/core/inspect.py +++ b/karton/core/inspect.py @@ -1,5 +1,5 @@ from collections import defaultdict -from typing import Dict, List +from typing import Dict, List, Optional from .backend import KartonBackend, KartonBind from .task import Task, TaskState @@ -9,9 +9,9 @@ class KartonQueue: """ View object representing a Karton queue - :param bind: :py:meth:`KartonBind` object representing the queue bind + :param bind: :class:`KartonBind` object representing the queue bind :param tasks: List of tasks currently in queue - :param state: :py:meth:`KartonBackend` object to be used + :param state: :class:`KartonState` object to be used """ def __init__( @@ -48,7 +48,7 @@ class KartonAnalysis: :param root_uid: Analysis root task uid :param tasks: List of tasks - :param state: :py:meth:`KartonBackend` object to be used + :param state: :class:`KartonState` object to be used """ def __init__(self, root_uid: str, tasks: List[Task], state: "KartonState") -> None: @@ -89,7 +89,7 @@ def get_queues_for_tasks( Group task objects by their queue name :param tasks: Task objects to group - :param state: :py:meth:`KartonBackend` to bind to created queues + :param state: :class:`KartonState` object to be used :return: A dictionary containing the queue names and lists of tasks """ tasks_per_queue = defaultdict(list) @@ -119,30 +119,68 @@ class KartonState: :param backend: :py:meth:`KartonBackend` object to use for data fetching """ - def __init__(self, backend: KartonBackend) -> None: + def __init__(self, backend: KartonBackend, parse_resources: bool = False) -> None: self.backend = backend self.binds = {bind.identity: bind for bind in backend.get_binds()} self.replicas = backend.get_online_consumers() - self.tasks = backend.get_all_tasks() - self.pending_tasks = [ - task for task in self.tasks if task.status != TaskState.FINISHED - ] - - # Tasks grouped by root_uid - tasks_per_analysis = defaultdict(list) - - for task in self.pending_tasks: - tasks_per_analysis[task.root_uid].append(task) - - self.analyses = { - root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self) - for root_uid, tasks in tasks_per_analysis.items() - } - queues = get_queues_for_tasks(self.pending_tasks, self) - # Present registered queues without tasks - for bind_name, bind in self.binds.items(): - if bind_name not in queues: - queues[bind_name] = KartonQueue( - bind=self.binds[bind_name], tasks=[], state=self + self.parse_resources = parse_resources + + self._tasks: Optional[List[Task]] = None + self._pending_tasks: Optional[List[Task]] = None + self._analyses: Optional[Dict[str, KartonAnalysis]] = None + self._queues: Optional[Dict[str, KartonQueue]] = None + + @property + def tasks(self) -> List[Task]: + if self._tasks is None: + self._tasks = self.backend.get_all_tasks( + parse_resources=self.parse_resources + ) + return self._tasks + + @property + def pending_tasks(self) -> List[Task]: + if self._pending_tasks is None: + self._pending_tasks = [ + task for task in self.tasks if task.status != TaskState.FINISHED + ] + return self._pending_tasks + + @property + def analyses(self) -> Dict[str, KartonAnalysis]: + if self._analyses is None: + # Tasks grouped by root_uid + tasks_per_analysis = defaultdict(list) + + for task in self.pending_tasks: + tasks_per_analysis[task.root_uid].append(task) + + self._analyses = { + root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self) + for root_uid, tasks in tasks_per_analysis.items() + } + return self._analyses + + @property + def queues(self) -> Dict[str, KartonQueue]: + if self._queues is None: + queues = get_queues_for_tasks(self.pending_tasks, self) + # Present registered queues without tasks + for bind_name, bind in self.binds.items(): + if bind_name not in queues: + queues[bind_name] = KartonQueue( + bind=self.binds[bind_name], tasks=[], state=self + ) + self._queues = queues + return self._queues + + def get_analysis(self, root_uid: str) -> KartonAnalysis: + return KartonAnalysis( + root_uid=root_uid, + tasks=list( + self.backend.iter_task_tree( + root_uid, parse_resources=self.parse_resources ) - self.queues = queues + ), + state=self, + ) diff --git a/karton/core/task.py b/karton/core/task.py index e0e1db2e..20f153dc 100644 --- a/karton/core/task.py +++ b/karton/core/task.py @@ -106,13 +106,18 @@ def __init__( raise ValueError("Persistent headers should be an instance of a dict") if uid is None: - self.uid = str(uuid.uuid4()) + task_uid = str(uuid.uuid4()) + if root_uid is None: + self.root_uid = task_uid + else: + self.root_uid = root_uid + # New-style UID format introduced in v5.4.0 + # {12345678-1234-1234-1234-12345678abcd}:12345678-1234-1234-1234-12345678abcd + self.uid = f"{{{self.root_uid}}}:{task_uid}" else: self.uid = uid - - if root_uid is None: - self.root_uid = self.uid - else: + if root_uid is None: + raise ValueError("root_uid cannot be None when uid is not None") self.root_uid = root_uid self.orig_uid = orig_uid @@ -137,6 +142,12 @@ def headers_persistent(self) -> Dict[str, Any]: def receiver(self) -> Optional[str]: return self.headers.get("receiver") + @property + def task_uid(self) -> str: + if ":" not in self.uid: + return self.uid + return self.uid.split(":")[-1] + def fork_task(self) -> "Task": """ Fork task to transfer single task to many queues (but use different UID). From 4fdb783fa70031e6c2091f07bc2c78591cba6d8d Mon Sep 17 00:00:00 2001 From: psrok1 Date: Tue, 14 May 2024 12:19:36 +0200 Subject: [PATCH 2/8] Process legacy tasks in KartonBackend.iter_task_tree --- karton/core/backend.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index c4c1562d..2c1b33fb 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -545,18 +545,27 @@ def iter_task_tree( It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation to learn more. :return: Iterator with task objects - - .. note:: - This method processes only these tasks that are stored under - karton.task:: key format which is fully-qualified - identifier introduced in Karton 5.4.0 - - Unrouted tasks produced by older Karton versions won't be returned. """ + # Process unrouted tasks produced by Karton <5.4.0 + legacy_task_keys = self.redis.scan_iter( + match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size + ) + for chunk in chunks_iter(legacy_task_keys, chunk_size): + yield from filter( + lambda task: task.root_uid == root_uid, + ( + Task.unserialize(task_data, backend=self) + if parse_resources + else Task.unserialize(task_data, parse_resources=False) + for task_data in self.redis.mget(chunk) + if task_data is not None + ), + ) + # Process >=5.4.0 tasks task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size ) - return self._iter_tasks( + yield from self._iter_tasks( task_keys, chunk_size=chunk_size, parse_resources=parse_resources ) From 0179252093d6ff34a1df36b417d422e5ba2a5edd Mon Sep 17 00:00:00 2001 From: psrok1 Date: Tue, 14 May 2024 15:31:52 +0200 Subject: [PATCH 3/8] Add fquid_to_uid method --- karton/core/task.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/karton/core/task.py b/karton/core/task.py index 20f153dc..0c072d83 100644 --- a/karton/core/task.py +++ b/karton/core/task.py @@ -144,9 +144,18 @@ def receiver(self) -> Optional[str]: @property def task_uid(self) -> str: - if ":" not in self.uid: - return self.uid - return self.uid.split(":")[-1] + return self.fquid_to_uid(self.uid) + + @staticmethod + def fquid_to_uid(fquid: str) -> str: + """ + Gets task uid from fully-qualified fquid ({root_uid}:task_uid) + + :return: Task uid + """ + if ":" not in fquid: + return fquid + return fquid.split(":")[-1] def fork_task(self) -> "Task": """ From dc348c46a20246b2725ea088497f8ff7fb894d01 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Tue, 14 May 2024 17:32:11 +0200 Subject: [PATCH 4/8] karton-system: redundant root_uid in log --- karton/system/system.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/system/system.py b/karton/system/system.py index 18e2adfb..0947d0e8 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -166,7 +166,7 @@ def gc_collect(self) -> None: def route_task(self, task: Task, binds: List[KartonBind]) -> None: # Performs routing of task - self.log.info("[%s] Processing task %s", task.root_uid, task.uid) + self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid) # store the producer-task relationship in redis for task tracking self.backend.log_identity_output( task.headers.get("origin", "unknown"), task.headers From 4bf2bb5ebfc06943523bc1f370ac0470e54ca2b3 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Tue, 14 May 2024 18:02:00 +0200 Subject: [PATCH 5/8] Apply suggestions from review --- karton/core/backend.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index 2c1b33fb..d6788986 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -532,21 +532,15 @@ def get_all_tasks( self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources) ) - def iter_task_tree( + def _iter_legacy_task_tree( self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True ) -> Iterator[Task]: """ - Iterates all tasks that belong to the same analysis task tree - and have the same root_uid + Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing + before upgrade) - :param root_uid: Root identifier of task tree - :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command - :param parse_resources: If set to False, resources are not parsed. - It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation - to learn more. - :return: Iterator with task objects + Used internally by iter_task_tree. """ - # Process unrouted tasks produced by Karton <5.4.0 legacy_task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size ) @@ -554,13 +548,31 @@ def iter_task_tree( yield from filter( lambda task: task.root_uid == root_uid, ( - Task.unserialize(task_data, backend=self) - if parse_resources - else Task.unserialize(task_data, parse_resources=False) + Task.unserialize(task_data, parse_resources=parse_resources) for task_data in self.redis.mget(chunk) if task_data is not None ), ) + + def iter_task_tree( + self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True + ) -> Iterator[Task]: + """ + Iterates all tasks that belong to the same analysis task tree + and have the same root_uid + + :param root_uid: Root identifier of task tree + :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. + :return: Iterator with task objects + """ + # Process <5.4.0 tasks (unrouted from <5.4.0 producers + # or existing before upgrade) + yield from self._iter_legacy_task_tree( + root_uid, chunk_size=chunk_size, parse_resources=parse_resources + ) # Process >=5.4.0 tasks task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size From d8150c366e347674e9aa3b2d7182915fc8e8be60 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Wed, 15 May 2024 15:46:49 +0200 Subject: [PATCH 6/8] Add comment --- karton/core/backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/karton/core/backend.py b/karton/core/backend.py index d6788986..cf6bbd0f 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -541,6 +541,7 @@ def _iter_legacy_task_tree( Used internally by iter_task_tree. """ + # Iterate over all karton tasks that do not match the new task id format legacy_task_keys = self.redis.scan_iter( match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size ) From cc02c92de427e25ce6674448d2efafe1332661e4 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Wed, 15 May 2024 15:57:25 +0200 Subject: [PATCH 7/8] Fix pattern to match old task id format --- karton/core/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index cf6bbd0f..0643cfa1 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -543,7 +543,7 @@ def _iter_legacy_task_tree( """ # Iterate over all karton tasks that do not match the new task id format legacy_task_keys = self.redis.scan_iter( - match=f"{KARTON_TASK_NAMESPACE}:*[^:]*", count=chunk_size + match=f"{KARTON_TASK_NAMESPACE}:[^{{]*", count=chunk_size ) for chunk in chunks_iter(legacy_task_keys, chunk_size): yield from filter( From 98e8bf5b2dafaa964612b1130343200872d6b063 Mon Sep 17 00:00:00 2001 From: psrok1 Date: Wed, 15 May 2024 18:51:12 +0200 Subject: [PATCH 8/8] why i'm like this --- karton/core/backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index 0643cfa1..ae9a9e7e 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -549,7 +549,9 @@ def _iter_legacy_task_tree( yield from filter( lambda task: task.root_uid == root_uid, ( - Task.unserialize(task_data, parse_resources=parse_resources) + Task.unserialize( + task_data, backend=self, parse_resources=parse_resources + ) for task_data in self.redis.mget(chunk) if task_data is not None ),