Skip to content

Commit

Permalink
Merge pull request #996 from ods/admin-to-controller
Browse files Browse the repository at this point in the history
Send create/delete topics request to controller
  • Loading branch information
ods authored Apr 14, 2024
2 parents 1855cde + 8adb7b9 commit 2bba153
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
12 changes: 11 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ Changelog
Unreleased
==========

New features:

* Implement DeleteRecords API (`KIP-204`_) (pr #969 by @vmaurin)

.. _KIP-204: https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API


Bugfixes:

* Fix serialization for batch (issue #886, pr #887 by @ydjin0602)
* Fix type annotation for `AIOKafkaAdminClient.create_partitions` (pr #978 by @alm0ra)
* Fix type annotation for `AIOKafkaAdminClient.create_partitions`
(pr #978 by @alm0ra)
* Fix `NotControllerError` in `AIOKafkaAdminClient.create_topics` and other
methods (issue #995)


0.10.0 (2023-12-15)
Expand Down
1 change: 0 additions & 1 deletion CHANGES/969.feature

This file was deleted.

41 changes: 34 additions & 7 deletions aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from ssl import SSLContext
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union

import async_timeout

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotControllerError,
NotLeaderForPartitionError,
for_code,
)
Expand Down Expand Up @@ -183,6 +186,33 @@ def _matching_api_version(self, operation: Sequence[Type[Request]]) -> int:
)
return version

async def _send_request_to_node(self, node_id: int, request: Request) -> Response:
async with async_timeout.timeout(self._client._request_timeout_ms / 1000):
while True:
ready = await self._client.ready(node_id)
if ready:
break
await asyncio.sleep(self._client._retry_backoff)

return await self._client.send(node_id, request)

async def _send_to_controller(self, request: Request) -> Response:
# With "auto" api_version the first request is sent with minimal
# version, so the controller is not returned in metadata.
if self._client.cluster.controller is None:
await self._client.force_metadata_update()

# 2 attempts in case cluster metadata is outdated
try:
return await self._send_request_to_node(
self._client.cluster.controller.nodeId, request
)
except NotControllerError:
await self._client.force_metadata_update()
return await self._send_request_to_node(
self._client.cluster.controller.nodeId, request
)

@staticmethod
def _convert_new_topic_request(new_topic):
return (
Expand Down Expand Up @@ -233,8 +263,7 @@ async def create_topics(
f"Support for CreateTopics v{version} has not yet been added "
"to AIOKafkaAdminClient."
)
response = await self._client.send(self._client.get_random_node(), request)
return response
return await self._send_to_controller(request)

async def delete_topics(
self,
Expand All @@ -251,8 +280,7 @@ async def delete_topics(
version = self._matching_api_version(DeleteTopicsRequest)
req_cls = DeleteTopicsRequest[version]
request = req_cls(topics, timeout_ms or self._request_timeout_ms)
response = await self._send_request(request)
return response
return await self._send_to_controller(request)

async def _get_cluster_metadata(
self,
Expand All @@ -265,8 +293,7 @@ async def _get_cluster_metadata(
"""
req_cls = MetadataRequest[self._matching_api_version(MetadataRequest)]
request = req_cls(topics=topics)
response = await self._send_request(request)
return response
return await self._send_request(request)

async def list_topics(self) -> List[str]:
metadata = await self._get_cluster_metadata(topics=None)
Expand Down Expand Up @@ -419,7 +446,7 @@ async def create_partitions(
timeout=timeout_ms or self._request_timeout_ms,
validate_only=validate_only,
)
resp = await self._send_request(req)
resp = await self._send_to_controller(req)
for topic, code, message in resp.topic_errors:
if code:
err_cls = for_code(code)
Expand Down
4 changes: 1 addition & 3 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal
# metadata.
# I think requiring metadata should solve this problem
if broker is None:
raise StaleMetadata(
"Broker id %s not in current metadata" % node_id
)
raise StaleMetadata(f"Broker id {node_id} not in current metadata")
else:
broker = self.cluster.coordinator_metadata(node_id)
assert broker is not None
Expand Down
1 change: 0 additions & 1 deletion aiokafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

_XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1)
_XERIAL_V1_FORMAT = "bccccccBii"
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import cramjam
Expand Down

0 comments on commit 2bba153

Please sign in to comment.