diff --git a/Dockerfile b/Dockerfile index 4f7f69e..44ed3ec 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7 +FROM python:3.11 WORKDIR /app/service COPY ./requirements.txt ./requirements.txt diff --git a/karton/core/backend.py b/karton/core/backend.py index bd7d88e..0de208d 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -15,7 +15,7 @@ from .exceptions import InvalidIdentityError from .task import Task, TaskPriority, TaskState -from .utils import chunks +from .utils import chunks, chunks_iter KARTON_TASKS_QUEUE = "karton.tasks" KARTON_OPERATIONS_QUEUE = "karton.operations" @@ -377,12 +377,20 @@ def get_task(self, task_uid: str) -> Optional[Task]: return None return Task.unserialize(task_data, backend=self) - def get_tasks(self, task_uid_list: List[str], chunk_size: int = 1000) -> List[Task]: + def get_tasks( + self, + task_uid_list: List[str], + chunk_size: int = 1000, + parse_resources: bool = True, + ) -> List[Task]: """ Get multiple tasks for given identifier list :param task_uid_list: List of task identifiers :param chunk_size: Size of chunks passed to the Redis MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` + documentation to learn more. :return: List of task objects """ keys = chunks( @@ -391,25 +399,89 @@ def get_tasks(self, task_uid_list: List[str], chunk_size: int = 1000) -> List[Ta ) return [ Task.unserialize(task_data, backend=self) + if parse_resources + else Task.unserialize(task_data, parse_resources=False) for chunk in keys for task_data in self.redis.mget(chunk) if task_data is not None ] - def get_all_tasks(self, chunk_size: int = 1000) -> List[Task]: + def _iter_tasks( + self, + task_keys: Iterator[str], + chunk_size: int = 1000, + parse_resources: bool = True, + ) -> Iterator[Task]: + for chunk in chunks_iter(task_keys, chunk_size): + yield from ( + Task.unserialize(task_data, backend=self) + if parse_resources + else Task.unserialize(task_data, parse_resources=False) + for task_data in self.redis.mget(chunk) + if task_data is not None + ) + + def iter_tasks( + self, + task_uid_list: Iterable[str], + chunk_size: int = 1000, + parse_resources: bool = True, + ) -> Iterator[Task]: + """ + Get multiple tasks for given identifier list as an iterator + :param task_uid_list: List of task fully-qualified identifiers + :param chunk_size: Size of chunks passed to the Redis MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. + :return: Iterator with task objects + """ + return self._iter_tasks( + map( + lambda task_uid: f"{KARTON_TASK_NAMESPACE}:{task_uid}", + task_uid_list, + ), + chunk_size=chunk_size, + parse_resources=parse_resources, + ) + + def iter_all_tasks( + self, chunk_size: int = 1000, parse_resources: bool = True + ) -> Iterator[Task]: + """ + Iterates all tasks registered in Redis + :param chunk_size: Size of chunks passed to the Redis SCAN and MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. + :return: Iterator with Task objects + """ + task_keys = self.redis.scan_iter( + match=f"{KARTON_TASK_NAMESPACE}:*", count=chunk_size + ) + return self._iter_tasks( + task_keys, chunk_size=chunk_size, parse_resources=parse_resources + ) + + def get_all_tasks( + self, chunk_size: int = 1000, parse_resources: bool = True + ) -> List[Task]: """ Get all tasks registered in Redis + .. warning:: + This method loads all tasks into memory. + It's recommended to use :py:meth:`iter_all_tasks` instead. + :param chunk_size: Size of chunks passed to the Redis MGET command + :param parse_resources: If set to False, resources are not parsed. + It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation + to learn more. :return: List with Task objects """ - tasks = self.redis.keys(f"{KARTON_TASK_NAMESPACE}:*") - return [ - Task.unserialize(task_data) - for chunk in chunks(tasks, chunk_size) - for task_data in self.redis.mget(chunk) - if task_data is not None - ] + return list( + self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources) + ) def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None: """ @@ -451,6 +523,11 @@ def delete_task(self, task: Task) -> None: """ Remove task from Redis + .. warning:: + Used internally by karton.system. + If you want to cancel task: mark it as finished and let it be deleted + by karton.system. + :param task: Task object """ self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}") @@ -459,6 +536,11 @@ def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None: """ Remove multiple tasks from Redis + .. warning:: + Used internally by karton.system. + If you want to cancel task: mark it as finished and let it be deleted + by karton.system. + :param tasks: List of Task objects :param chunk_size: Size of chunks passed to the Redis DELETE command """ @@ -485,6 +567,14 @@ def get_task_ids_from_queue(self, queue: str) -> List[str]: """ return self.redis.lrange(queue, 0, -1) + def delete_consumer_queues(self, identity: str) -> None: + """ + Deletes consumer queues for given identity + + :param identity: Consumer identity + """ + self.redis.delete(*self.get_queue_names(identity)) + def remove_task_queue(self, queue: str) -> List[Task]: """ Remove task queue with all contained tasks @@ -535,6 +625,20 @@ def consume_queues( """ return self.redis.blpop(queues, timeout=timeout) + def increment_multiple_metrics( + self, metric: KartonMetrics, increments: Dict[str, int] + ) -> None: + """ + Increments metrics for multiple identities by given value via single pipeline + :param metric: Operation metric type + :param increments: Dictionary of Karton service identities and value + to add to the metric + """ + p = self.redis.pipeline() + for identity, increment in increments.items(): + p.hincrby(metric.value, identity, increment) + p.execute() + def consume_queues_batch(self, queue: str, max_count: int) -> List[str]: """ Get a batch of items from the queue diff --git a/karton/core/task.py b/karton/core/task.py index 209c9bd..8eb8c1c 100644 --- a/karton/core/task.py +++ b/karton/core/task.py @@ -22,6 +22,8 @@ if TYPE_CHECKING: from .backend import KartonBackend # noqa +import orjson + class TaskState(enum.Enum): DECLARED = "Declared" # Task declared in TASKS_QUEUE @@ -62,6 +64,21 @@ class Task(object): :param error: Traceback of a exception that happened while performing this task """ + __slots__ = ( + "uid", + "root_uid", + "orig_uid", + "parent_uid", + "error", + "headers", + "status", + "last_update", + "priority", + "payload", + "payload_persistent", + "_headers_persistent_keys", + ) + def __init__( self, headers: Dict[str, Any], @@ -74,6 +91,8 @@ def __init__( orig_uid: Optional[str] = None, uid: Optional[str] = None, error: Optional[List[str]] = None, + _status: Optional[TaskState] = None, + _last_update: Optional[float] = None, ) -> None: payload = payload or {} payload_persistent = payload_persistent or {} @@ -102,9 +121,9 @@ def __init__( self.error = error self.headers = {**headers, **headers_persistent} self._headers_persistent_keys = set(headers_persistent.keys()) - self.status = TaskState.DECLARED + self.status = _status or TaskState.DECLARED - self.last_update: float = time.time() + self.last_update: float = _last_update or time.time() self.priority = priority or TaskPriority.NORMAL self.payload = dict(payload) @@ -114,6 +133,10 @@ def __init__( def headers_persistent(self) -> Dict[str, Any]: return {k: v for k, v in self.headers.items() if self.is_header_persistent(k)} + @property + def receiver(self) -> Optional[str]: + return self.headers.get("receiver") + def fork_task(self) -> "Task": """ Fork task to transfer single task to many queues (but use different UID). @@ -362,13 +385,24 @@ def iterate_resources(self) -> Iterator[ResourceBase]: @staticmethod def unserialize( - data: Union[str, bytes], backend: Optional["KartonBackend"] = None + data: Union[str, bytes], + backend: Optional["KartonBackend"] = None, + parse_resources: bool = True, ) -> "Task": """ Unserialize Task instance from JSON string :param data: JSON-serialized task :param backend: Backend instance to be bound to RemoteResource objects + :param parse_resources: | + If set to False (default is True), method doesn't + deserialize '__karton_resource__' entries, which speeds up deserialization + process. This flag is used mainly for multiple task processing e.g. + filtering based on status. + When resource deserialization is turned off, Task.unserialize will try + to use faster 3rd-party JSON parser (orjson) if it's installed. It's not + added as a required dependency but can speed up things if you need to check + status of multiple tasks at once. :return: Unserialized Task object :meta private: @@ -386,7 +420,10 @@ def unserialize_resources(value: Any) -> Any: if not isinstance(data, str): data = data.decode("utf8") - task_data = json.loads(data, object_hook=unserialize_resources) + if parse_resources: + task_data = json.loads(data, object_hook=unserialize_resources) + else: + task_data = orjson.loads(data) # Compatibility with Karton <5.2.0 headers_persistent_fallback = task_data["payload_persistent"].get( @@ -399,24 +436,24 @@ def unserialize_resources(value: Any) -> Any: task = Task( task_data["headers"], headers_persistent=headers_persistent, + uid=task_data["uid"], + root_uid=task_data["root_uid"], + parent_uid=task_data["parent_uid"], + # Compatibility with <= 3.x.x (get) + orig_uid=task_data.get("orig_uid", None), + payload=task_data["payload"], + payload_persistent=task_data["payload_persistent"], + # Compatibility with <= 3.x.x (get) + error=task_data.get("error"), + # Compatibility with <= 2.x.x (get) + priority=( + TaskPriority(task_data.get("priority")) + if "priority" in task_data + else TaskPriority.NORMAL + ), + _status=TaskState(task_data["status"]), + _last_update=task_data.get("last_update", None), ) - task.uid = task_data["uid"] - task.root_uid = task_data["root_uid"] - task.parent_uid = task_data["parent_uid"] - # Compatibility with <= 3.x.x (get) - task.orig_uid = task_data.get("orig_uid", None) - task.status = TaskState(task_data["status"]) - # Compatibility with <= 3.x.x (get) - task.error = task_data.get("error") - # Compatibility with <= 2.x.x (get) - task.priority = ( - TaskPriority(task_data.get("priority")) - if "priority" in task_data - else TaskPriority.NORMAL - ) - task.last_update = task_data.get("last_update", None) - task.payload = task_data["payload"] - task.payload_persistent = task_data["payload_persistent"] return task def __repr__(self) -> str: diff --git a/karton/core/utils.py b/karton/core/utils.py index 71bdb28..d4b8efa 100644 --- a/karton/core/utils.py +++ b/karton/core/utils.py @@ -1,4 +1,5 @@ import functools +import itertools import signal from contextlib import contextmanager from typing import Any, Callable, Iterator, Sequence, Tuple, TypeVar @@ -12,6 +13,16 @@ def chunks(seq: Sequence[T], size: int) -> Iterator[Sequence[T]]: return (seq[pos : pos + size] for pos in range(0, len(seq), size)) +def chunks_iter(seq: Iterator[T], size: int) -> Iterator[Sequence[T]]: + # We need to ensure that seq is iterator, so this method works correctly + it = iter(seq) + while True: + elements = list(itertools.islice(it, size)) + if len(elements) == 0: + return + yield elements + + def recursive_iter(obj: Any) -> Iterator[Any]: """ Yields all values recursively from nested list/dict structures diff --git a/karton/system/system.py b/karton/system/system.py index 9280919..a56f939 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -53,7 +53,7 @@ def gc_collect_resources(self) -> None: resources_to_remove = set(self.backend.list_objects(karton_bucket)) # Note: it is important to get list of resources before getting list of tasks! # Task is created before resource upload to lock the reference to the resource. - tasks = self.backend.get_all_tasks() + tasks = self.backend.iter_all_tasks() for task in tasks: for resource in task.iterate_resources(): # If resource is referenced by task: remove it from set @@ -66,39 +66,39 @@ def gc_collect_resources(self) -> None: if resources_to_remove: self.backend.remove_objects(karton_bucket, resources_to_remove) - def gc_collect_abandoned_queues(self): - online_consumers = self.backend.get_online_consumers() - for bind in self.backend.get_binds(): - identity = bind.identity - if identity not in online_consumers and not bind.persistent: - # If offline and not persistent: remove queue - for queue in self.backend.get_queue_names(identity): - self.log.info( - "Non-persistent: unwinding tasks from queue %s", queue - ) - removed_tasks = self.backend.remove_task_queue(queue) - for removed_task in removed_tasks: - self.log.info("Unwinding task %s", str(removed_task.uid)) - # Mark task as finished - self.backend.set_task_status(removed_task, TaskState.FINISHED) - self.log.info("Non-persistent: removing bind %s", identity) - self.backend.unregister_bind(identity) - def gc_collect_tasks(self) -> None: # Collects finished tasks root_tasks = set() running_root_tasks = set() - tasks = self.backend.get_all_tasks() - enqueued_task_uids = self.backend.get_task_ids_from_queue(KARTON_TASKS_QUEUE) + unrouted_task_uids = self.backend.get_task_ids_from_queue(KARTON_TASKS_QUEUE) current_time = time.time() to_delete = [] - for task in tasks: + queues_to_clear = set() + online_consumers = self.backend.get_online_consumers() + for bind in self.backend.get_binds(): + identity = bind.identity + if identity not in online_consumers and not bind.persistent: + # If offline and not persistent: mark queue to be removed + queues_to_clear.add(identity) + self.log.info("Non-persistent: removing bind %s", identity) + self.backend.unregister_bind(identity) + self.backend.delete_consumer_queues(identity) + + for task in self.backend.iter_all_tasks(parse_resources=False): root_tasks.add(task.root_uid) - if ( + if task.receiver in queues_to_clear: + to_delete.append(task) + self.log.info( + "Task %s is abandoned by inactive non-persistent consumer." + "Killed. (receiver: %s)", + task.uid, + task.headers.get("receiver", ""), + ) + elif ( task.status == TaskState.DECLARED - and task.uid not in enqueued_task_uids + and task.uid not in unrouted_task_uids and task.last_update is not None and current_time > task.last_update + self.task_dispatched_timeout ): @@ -158,7 +158,6 @@ def gc_collect_tasks(self) -> None: def gc_collect(self) -> None: if time.time() > (self.last_gc_trigger + self.gc_interval): try: - self.gc_collect_abandoned_queues() self.gc_collect_tasks() self.gc_collect_resources() except Exception: diff --git a/requirements.txt b/requirements.txt index d9cda45..733aebe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ redis boto3 +orjson