Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task tree: backend performance improvements #255

Merged
merged 8 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/logger.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM python:3.11

COPY requirements.txt /karton/
COPY setup.py /karton/
Expand Down
2 changes: 1 addition & 1 deletion karton/core/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "5.3.4"
__version__ = "5.4.0"
52 changes: 52 additions & 0 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,58 @@ def get_all_tasks(
self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources)
)

def _iter_legacy_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing
before upgrade)

Used internally by iter_task_tree.
"""
# Iterate over all karton tasks that do not match the new task id format
legacy_task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:[^{{]*", count=chunk_size
)
for chunk in chunks_iter(legacy_task_keys, chunk_size):
yield from filter(
lambda task: task.root_uid == root_uid,
(
Task.unserialize(
task_data, backend=self, parse_resources=parse_resources
)
for task_data in self.redis.mget(chunk)
if task_data is not None
),
)

def iter_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid

:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects
"""
# Process <5.4.0 tasks (unrouted from <5.4.0 producers
# or existing before upgrade)
yield from self._iter_legacy_task_tree(
root_uid, chunk_size=chunk_size, parse_resources=parse_resources
)
# Process >=5.4.0 tasks
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size
)
yield from self._iter_tasks(
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
"""
Register or update task in Redis.
Expand Down
94 changes: 66 additions & 28 deletions karton/core/inspect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, List
from typing import Dict, List, Optional

from .backend import KartonBackend, KartonBind
from .task import Task, TaskState
Expand All @@ -9,9 +9,9 @@ class KartonQueue:
"""
View object representing a Karton queue

:param bind: :py:meth:`KartonBind` object representing the queue bind
:param bind: :class:`KartonBind` object representing the queue bind
:param tasks: List of tasks currently in queue
:param state: :py:meth:`KartonBackend` object to be used
:param state: :class:`KartonState` object to be used
"""

def __init__(
Expand Down Expand Up @@ -48,7 +48,7 @@ class KartonAnalysis:

:param root_uid: Analysis root task uid
:param tasks: List of tasks
:param state: :py:meth:`KartonBackend` object to be used
:param state: :class:`KartonState` object to be used
"""

def __init__(self, root_uid: str, tasks: List[Task], state: "KartonState") -> None:
Expand Down Expand Up @@ -89,7 +89,7 @@ def get_queues_for_tasks(
Group task objects by their queue name

:param tasks: Task objects to group
:param state: :py:meth:`KartonBackend` to bind to created queues
:param state: :class:`KartonState` object to be used
:return: A dictionary containing the queue names and lists of tasks
"""
tasks_per_queue = defaultdict(list)
Expand Down Expand Up @@ -119,30 +119,68 @@ class KartonState:
:param backend: :py:meth:`KartonBackend` object to use for data fetching
"""

def __init__(self, backend: KartonBackend) -> None:
def __init__(self, backend: KartonBackend, parse_resources: bool = False) -> None:
self.backend = backend
self.binds = {bind.identity: bind for bind in backend.get_binds()}
self.replicas = backend.get_online_consumers()
self.tasks = backend.get_all_tasks()
self.pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]

# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self.analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
self.parse_resources = parse_resources

self._tasks: Optional[List[Task]] = None
self._pending_tasks: Optional[List[Task]] = None
self._analyses: Optional[Dict[str, KartonAnalysis]] = None
self._queues: Optional[Dict[str, KartonQueue]] = None

@property
def tasks(self) -> List[Task]:
if self._tasks is None:
self._tasks = self.backend.get_all_tasks(
parse_resources=self.parse_resources
)
return self._tasks

@property
def pending_tasks(self) -> List[Task]:
if self._pending_tasks is None:
self._pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]
return self._pending_tasks

@property
def analyses(self) -> Dict[str, KartonAnalysis]:
if self._analyses is None:
# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self._analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
return self._analyses

@property
def queues(self) -> Dict[str, KartonQueue]:
if self._queues is None:
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
)
self._queues = queues
return self._queues

def get_analysis(self, root_uid: str) -> KartonAnalysis:
return KartonAnalysis(
root_uid=root_uid,
tasks=list(
self.backend.iter_task_tree(
root_uid, parse_resources=self.parse_resources
)
self.queues = queues
),
state=self,
)
30 changes: 25 additions & 5 deletions karton/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,18 @@ def __init__(
raise ValueError("Persistent headers should be an instance of a dict")

if uid is None:
self.uid = str(uuid.uuid4())
task_uid = str(uuid.uuid4())
if root_uid is None:
self.root_uid = task_uid
else:
self.root_uid = root_uid
# New-style UID format introduced in v5.4.0
# {12345678-1234-1234-1234-12345678abcd}:12345678-1234-1234-1234-12345678abcd
self.uid = f"{{{self.root_uid}}}:{task_uid}"
else:
self.uid = uid

if root_uid is None:
self.root_uid = self.uid
else:
if root_uid is None:
raise ValueError("root_uid cannot be None when uid is not None")
self.root_uid = root_uid

self.orig_uid = orig_uid
Expand All @@ -137,6 +142,21 @@ def headers_persistent(self) -> Dict[str, Any]:
def receiver(self) -> Optional[str]:
return self.headers.get("receiver")

@property
def task_uid(self) -> str:
return self.fquid_to_uid(self.uid)

@staticmethod
def fquid_to_uid(fquid: str) -> str:
"""
Gets task uid from fully-qualified fquid ({root_uid}:task_uid)

:return: Task uid
"""
if ":" not in fquid:
return fquid
return fquid.split(":")[-1]

def fork_task(self) -> "Task":
"""
Fork task to transfer single task to many queues (but use different UID).
Expand Down
2 changes: 1 addition & 1 deletion karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def gc_collect(self) -> None:

def route_task(self, task: Task, binds: List[KartonBind]) -> None:
# Performs routing of task
self.log.info("[%s] Processing task %s", task.root_uid, task.uid)
self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid)
# store the producer-task relationship in redis for task tracking
self.backend.log_identity_output(
task.headers.get("origin", "unknown"), task.headers
Expand Down
Loading