From 7a2ca5ab6e224f7dd0e972052ddee5901c4ab399 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 7 Apr 2024 21:42:30 +0300 Subject: [PATCH 1/4] Send create/delete topics request to controller --- aiokafka/admin/client.py | 34 ++++++++++++++++++++++++++++++++-- aiokafka/client.py | 4 +--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index ae306b04..a44b22b9 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -4,11 +4,14 @@ from ssl import SSLContext from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union +import async_timeout + from aiokafka import __version__ from aiokafka.client import AIOKafkaClient from aiokafka.errors import ( IncompatibleBrokerVersion, LeaderNotAvailableError, + NotControllerError, NotLeaderForPartitionError, for_code, ) @@ -183,6 +186,33 @@ def _matching_api_version(self, operation: Sequence[Type[Request]]) -> int: ) return version + async def _send_request_to_node(self, node_id: int, request: Request) -> Response: + async with async_timeout.timeout(self._client._request_timeout_ms / 1000): + while True: + ready = await self._client.ready(node_id) + if ready: + break + await asyncio.sleep(self._client._retry_backoff) + + return await self._client.send(node_id, request) + + async def _send_to_controller(self, request: Request) -> Response: + # With "auto" api_version the first request is sent with minimal + # version, so the controller is not returned in metadata. + if self._client.cluster.controller is None: + await self._client.force_metadata_update() + + # 2 attempts in case cluster metadata is outdated + try: + return await self._send_request_to_node( + self._client.cluster.controller.nodeId, request + ) + except NotControllerError: + await self._client.force_metadata_update() + return await self._send_request_to_node( + self._client.cluster.controller.nodeId, request + ) + @staticmethod def _convert_new_topic_request(new_topic): return ( @@ -233,7 +263,7 @@ async def create_topics( f"Support for CreateTopics v{version} has not yet been added " "to AIOKafkaAdminClient." ) - response = await self._client.send(self._client.get_random_node(), request) + response = await self._send_to_controller(request) return response async def delete_topics( @@ -251,7 +281,7 @@ async def delete_topics( version = self._matching_api_version(DeleteTopicsRequest) req_cls = DeleteTopicsRequest[version] request = req_cls(topics, timeout_ms or self._request_timeout_ms) - response = await self._send_request(request) + response = await self._send_to_controller(request) return response async def _get_cluster_metadata( diff --git a/aiokafka/client.py b/aiokafka/client.py index 09ab5a7e..02949588 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -433,9 +433,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal # metadata. # I think requiring metadata should solve this problem if broker is None: - raise StaleMetadata( - "Broker id %s not in current metadata" % node_id - ) + raise StaleMetadata(f"Broker id {node_id} not in current metadata") else: broker = self.cluster.coordinator_metadata(node_id) assert broker is not None From 0558acd61a43c963ab5c48bb38a91b698b8ca8eb Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Apr 2024 17:51:07 +0300 Subject: [PATCH 2/4] Remove unused ZSTD_MAX_OUTPUT_SIZE --- aiokafka/codec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aiokafka/codec.py b/aiokafka/codec.py index 71b2dc95..c061474a 100644 --- a/aiokafka/codec.py +++ b/aiokafka/codec.py @@ -7,7 +7,6 @@ _XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1) _XERIAL_V1_FORMAT = "bccccccBii" -ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import cramjam From 72b5cf6427aeb8798b47ef389bcf428636966543 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Apr 2024 18:41:05 +0300 Subject: [PATCH 3/4] Send create_partitions request to controller --- aiokafka/admin/client.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index a44b22b9..eb535306 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -263,8 +263,7 @@ async def create_topics( f"Support for CreateTopics v{version} has not yet been added " "to AIOKafkaAdminClient." ) - response = await self._send_to_controller(request) - return response + return await self._send_to_controller(request) async def delete_topics( self, @@ -281,8 +280,7 @@ async def delete_topics( version = self._matching_api_version(DeleteTopicsRequest) req_cls = DeleteTopicsRequest[version] request = req_cls(topics, timeout_ms or self._request_timeout_ms) - response = await self._send_to_controller(request) - return response + return await self._send_to_controller(request) async def _get_cluster_metadata( self, @@ -295,8 +293,7 @@ async def _get_cluster_metadata( """ req_cls = MetadataRequest[self._matching_api_version(MetadataRequest)] request = req_cls(topics=topics) - response = await self._send_request(request) - return response + return await self._send_request(request) async def list_topics(self) -> List[str]: metadata = await self._get_cluster_metadata(topics=None) @@ -449,7 +446,7 @@ async def create_partitions( timeout=timeout_ms or self._request_timeout_ms, validate_only=validate_only, ) - resp = await self._send_request(req) + resp = await self._send_to_controller(req) for topic, code, message in resp.topic_errors: if code: err_cls = for_code(code) From 8adb7b95d119f0b38ac09c8f1cf9e2208cc19602 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 14 Apr 2024 18:50:36 +0300 Subject: [PATCH 4/4] Update changelog --- CHANGES.rst | 12 +++++++++++- CHANGES/969.feature | 1 - 2 files changed, 11 insertions(+), 2 deletions(-) delete mode 100644 CHANGES/969.feature diff --git a/CHANGES.rst b/CHANGES.rst index f0864a38..e4df59ca 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,10 +5,20 @@ Changelog Unreleased ========== +New features: + +* Implement DeleteRecords API (`KIP-204`_) (pr #969 by @vmaurin) + +.. _KIP-204: https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API + + Bugfixes: * Fix serialization for batch (issue #886, pr #887 by @ydjin0602) -* Fix type annotation for `AIOKafkaAdminClient.create_partitions` (pr #978 by @alm0ra) +* Fix type annotation for `AIOKafkaAdminClient.create_partitions` + (pr #978 by @alm0ra) +* Fix `NotControllerError` in `AIOKafkaAdminClient.create_topics` and other + methods (issue #995) 0.10.0 (2023-12-15) diff --git a/CHANGES/969.feature b/CHANGES/969.feature deleted file mode 100644 index 96c0793c..00000000 --- a/CHANGES/969.feature +++ /dev/null @@ -1 +0,0 @@ -Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin)