Skip to content

Commit

Permalink
Implement KIP-345 in aiokafka (rebase of #827) (#941)
Browse files Browse the repository at this point in the history
* Implement KIP-345 in aiokafka

* fixing linting errors

* fixing linting errors

* fixing linting errors

* Update tests.yml

* Fix linting errors

* Linting fixed, tests still failing

* fixed tests.

* Undoing a lot of linting

* last few lints

* Update assignors.py

suppressing lgtm warning

* fix linting

* fix lgtm exception

* fix trailing space

* add KIP-345 tests, remove broker version check

* use aiokafka AbstractPartitionAssignor

* only test KIP-345 mode with valid Kafka versions

* Update aiokafka/consumer/group_coordinator.py

Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>

* remove AbstractStaticPartitionAssignor

* refactor _perform_assignment to use a JoinGroupResponse class as it's argument

* poll periodically in kip-345 tests

* update tests to use async_timeout

* update isinstance function, use JoinGroupResponse_v5 for check

---------

Co-authored-by: Vikram Patki 24489 <vpatki@wayfair.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
Co-authored-by: g-clef <spamblock@g-clef.net>
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
  • Loading branch information
5 people authored Dec 7, 2023
1 parent 8818517 commit bc14e6d
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 67 deletions.
5 changes: 5 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class AIOKafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str or None): name of the group instance ID used for
static membership (KIP-345)
key_deserializer (Callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (Callable, Optional): Any callable that takes a
Expand Down Expand Up @@ -229,6 +231,7 @@ def __init__(self, *topics, loop=None,
bootstrap_servers='localhost',
client_id='aiokafka-' + __version__,
group_id=None,
group_instance_id=None,
key_deserializer=None, value_deserializer=None,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800,
Expand Down Expand Up @@ -291,6 +294,7 @@ def __init__(self, *topics, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

self._group_id = group_id
self._group_instance_id = group_instance_id
self._heartbeat_interval_ms = heartbeat_interval_ms
self._session_timeout_ms = session_timeout_ms
self._retry_backoff_ms = retry_backoff_ms
Expand Down Expand Up @@ -382,6 +386,7 @@ async def start(self):
self._coordinator = GroupCoordinator(
self._client, self._subscription,
group_id=self._group_id,
group_instance_id=self._group_instance_id,
heartbeat_interval_ms=self._heartbeat_interval_ms,
session_timeout_ms=self._session_timeout_ms,
retry_backoff_ms=self._retry_backoff_ms,
Expand Down
169 changes: 110 additions & 59 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocol
from aiokafka.protocol.api import Response
from aiokafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from aiokafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest,
SyncGroupRequest, JoinGroupResponse, JoinGroupResponse_v5)
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import create_future, create_task


log = logging.getLogger(__name__)

UNKNOWN_OFFSET = -1
Expand Down Expand Up @@ -209,6 +212,7 @@ class GroupCoordinator(BaseCoordinator):

def __init__(self, client, subscription, *,
group_id='aiokafka-default-group',
group_instance_id=None,
session_timeout_ms=10000, heartbeat_interval_ms=3000,
retry_backoff_ms=100,
enable_auto_commit=True, auto_commit_interval_ms=5000,
Expand Down Expand Up @@ -240,6 +244,7 @@ def __init__(self, client, subscription, *,
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = group_id
self._group_instance_id = group_instance_id
self.coordinator_id = None

# Coordination flags and futures
Expand Down Expand Up @@ -345,9 +350,11 @@ def maybe_leave_group(self):
return task

async def _maybe_leave_group(self):
if self.generation > 0:
if self.generation > 0 and self._group_instance_id is None:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
# Note: do not send this leave request if we are running in static
# partition assignment mode (when group_instance_id has been set).
version = 0 if self._client.api_version < (0, 11, 0) else 1
request = LeaveGroupRequest[version](self.group_id, self.member_id)
try:
Expand Down Expand Up @@ -393,15 +400,22 @@ async def _on_join_prepare(self, previous_assignment):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)

async def _perform_assignment(
self, leader_id, assignment_strategy, members
):
async def _perform_assignment(self, response: Response):
assignment_strategy = response.group_protocol
members = response.members
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, \
'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
for member in members:
if isinstance(response, JoinGroupResponse_v5):
member_id, group_instance_id, metadata_bytes = member
elif isinstance(response, (JoinGroupResponse[0], JoinGroupResponse[1],
JoinGroupResponse[2])):
member_id, metadata_bytes = member
else:
raise Exception("unknown protocol returned from assignment")
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription)
Expand Down Expand Up @@ -1202,46 +1216,67 @@ async def perform_group_join(self):
metadata = metadata.encode()
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
# for KIP-394 we may have to send a second join request
try_join = True
while try_join:
try_join = False

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (2, 3, 0):
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
else:
request = JoinGroupRequest[3](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
self._coordinator._group_instance_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
else:
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
error_type = Errors.for_code(response.error_code)

if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None
if error_type is Errors.MemberIdRequired:
self._coordinator.member_id = response.member_id
try_join = True

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Join group response %s", response)
self._coordinator.member_id = response.member_id
Expand Down Expand Up @@ -1300,12 +1335,22 @@ async def perform_group_join(self):

async def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[])
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[],
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
[],
)
log.debug(
"Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
Expand All @@ -1323,11 +1368,7 @@ async def _on_join_leader(self, response):
Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = \
await self._coordinator._perform_assignment(
response.leader_id,
response.group_protocol,
response.members)
group_assignment = await self._coordinator._perform_assignment(response)
except Exception as e:
raise Errors.KafkaError(repr(e))

Expand All @@ -1337,12 +1378,22 @@ async def _on_join_leader(self, response):
assignment = assignment.encode()
assignment_req.append((member_id, assignment))

version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req)
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req,
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
assignment_req,
)

log.debug(
"Sending leader SyncGroup for group %s to coordinator %s: %s",
Expand Down
2 changes: 0 additions & 2 deletions aiokafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

log = logging.getLogger(__name__)

(List, Future)


class SubscriptionType(Enum):

Expand Down
9 changes: 9 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,15 @@ class ListenerNotFound(BrokerResponseError):
)


class MemberIdRequired(BrokerResponseError):
errno = 79
message = 'MEMBER_ID_REQUIRED'
description = (
'Consumer needs to have a valid member '
'id before actually entering group'
)


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if (
Expand Down
Loading

0 comments on commit bc14e6d

Please sign in to comment.