diff --git a/CHANGES.rst b/CHANGES.rst index f0864a38..e4df59ca 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,10 +5,20 @@ Changelog Unreleased ========== +New features: + +* Implement DeleteRecords API (`KIP-204`_) (pr #969 by @vmaurin) + +.. _KIP-204: https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API + + Bugfixes: * Fix serialization for batch (issue #886, pr #887 by @ydjin0602) -* Fix type annotation for `AIOKafkaAdminClient.create_partitions` (pr #978 by @alm0ra) +* Fix type annotation for `AIOKafkaAdminClient.create_partitions` + (pr #978 by @alm0ra) +* Fix `NotControllerError` in `AIOKafkaAdminClient.create_topics` and other + methods (issue #995) 0.10.0 (2023-12-15) diff --git a/CHANGES/969.feature b/CHANGES/969.feature deleted file mode 100644 index 96c0793c..00000000 --- a/CHANGES/969.feature +++ /dev/null @@ -1 +0,0 @@ -Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index ae306b04..eb535306 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -4,11 +4,14 @@ from ssl import SSLContext from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union +import async_timeout + from aiokafka import __version__ from aiokafka.client import AIOKafkaClient from aiokafka.errors import ( IncompatibleBrokerVersion, LeaderNotAvailableError, + NotControllerError, NotLeaderForPartitionError, for_code, ) @@ -183,6 +186,33 @@ def _matching_api_version(self, operation: Sequence[Type[Request]]) -> int: ) return version + async def _send_request_to_node(self, node_id: int, request: Request) -> Response: + async with async_timeout.timeout(self._client._request_timeout_ms / 1000): + while True: + ready = await self._client.ready(node_id) + if ready: + break + await asyncio.sleep(self._client._retry_backoff) + + return await self._client.send(node_id, request) + + async def _send_to_controller(self, request: Request) -> Response: + # With "auto" api_version the first request is sent with minimal + # version, so the controller is not returned in metadata. + if self._client.cluster.controller is None: + await self._client.force_metadata_update() + + # 2 attempts in case cluster metadata is outdated + try: + return await self._send_request_to_node( + self._client.cluster.controller.nodeId, request + ) + except NotControllerError: + await self._client.force_metadata_update() + return await self._send_request_to_node( + self._client.cluster.controller.nodeId, request + ) + @staticmethod def _convert_new_topic_request(new_topic): return ( @@ -233,8 +263,7 @@ async def create_topics( f"Support for CreateTopics v{version} has not yet been added " "to AIOKafkaAdminClient." ) - response = await self._client.send(self._client.get_random_node(), request) - return response + return await self._send_to_controller(request) async def delete_topics( self, @@ -251,8 +280,7 @@ async def delete_topics( version = self._matching_api_version(DeleteTopicsRequest) req_cls = DeleteTopicsRequest[version] request = req_cls(topics, timeout_ms or self._request_timeout_ms) - response = await self._send_request(request) - return response + return await self._send_to_controller(request) async def _get_cluster_metadata( self, @@ -265,8 +293,7 @@ async def _get_cluster_metadata( """ req_cls = MetadataRequest[self._matching_api_version(MetadataRequest)] request = req_cls(topics=topics) - response = await self._send_request(request) - return response + return await self._send_request(request) async def list_topics(self) -> List[str]: metadata = await self._get_cluster_metadata(topics=None) @@ -419,7 +446,7 @@ async def create_partitions( timeout=timeout_ms or self._request_timeout_ms, validate_only=validate_only, ) - resp = await self._send_request(req) + resp = await self._send_to_controller(req) for topic, code, message in resp.topic_errors: if code: err_cls = for_code(code) diff --git a/aiokafka/client.py b/aiokafka/client.py index 09ab5a7e..02949588 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -433,9 +433,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal # metadata. # I think requiring metadata should solve this problem if broker is None: - raise StaleMetadata( - "Broker id %s not in current metadata" % node_id - ) + raise StaleMetadata(f"Broker id {node_id} not in current metadata") else: broker = self.cluster.coordinator_metadata(node_id) assert broker is not None diff --git a/aiokafka/codec.py b/aiokafka/codec.py index 71b2dc95..c061474a 100644 --- a/aiokafka/codec.py +++ b/aiokafka/codec.py @@ -7,7 +7,6 @@ _XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1) _XERIAL_V1_FORMAT = "bccccccBii" -ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import cramjam