-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: master coordinator with aiokafka #880
feature: master coordinator with aiokafka #880
Conversation
63af623
to
cbe5020
Compare
d492cc8
to
7b3e299
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big and complex pr :) need to take a look again
karapace/coordinator/__init__.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: copyright?
group_generation_id=generation if generation is not None else -1, | ||
) | ||
|
||
def get_master_info(self) -> tuple[bool | None, str | None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit/optional: maybe a class like this?
@dataclasses.dataclass(frozen=True, kwonly=True)
class MasterInfo:
this_node_is_master: bool
master_url: str
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note also the @default_dataclass
helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gives you frozen + kw_only, and slots on Python 3.10+.
master_identity: MemberIdentity | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frozen?
member_data: bytes | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frozen?
} | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frozen?
get_member_url(member_identity["scheme"], member_identity["host"], member_identity["port"]) | ||
] = (member.member_id, member.member_data) | ||
if len(urls) > 0: | ||
chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we validate the election_strategy
to an enum? And have the default to the "lowest"?
If we select the highest this means that each kafka node up and running (with the highest broker id) could become running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a next refactor
chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] | ||
schema_master_id, member_data = urls[chosen_url] | ||
else: | ||
# Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the error of an assert, otherwise if for any weird reason this assumption its break we have no clue of why something weird its happening:
assert len(urls)+len(fallback_urls)>0, "Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjaakola-aiven what about this?
94bc884
to
c0c07fa
Compare
c0c07fa
to
f7bd79c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, lets merge it
About this change - What it does
Change the coordinator to use
aiokafka
.Why this way
In cases when Kafka brokers are replaced with new brokers on new servers the
kafka-python
group coordinator can get stuck in a perpetual DNS failure loop. The exact thread getting stuck is the heartbeat thread of Kafka client. This condition requires Karapace restart. See also wbarnha/kafka-python-ng#36.Work to remove the
kafka-python
as a dependency is also progressing and this is one more step and provides async capable functionality. Therdkafka
has group coordinator but it is not exposed fromconfluent-kafka
Python binding and cannot be used for Karapace primary selection coordinator at this time. The work required to have the group coordination exposed fromrdkafka
is a future item to investigate.The primary coordinator is adapted from
aiokafka
group coordinator for Karapace. Required changes include removing of subscription handling and partition assignors, adding Karapace specific metadata to group and selecting the primary instance. Note that Karapace can join as follower to the group but selected as primary instance.The implementation removes the primary coordinator thread and primary coordinator is run in the application event loop.