Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-345 in aiokafka (rebase of #682) #827

Closed
wants to merge 14 commits into from

Conversation

ntextreme3
Copy link

@ntextreme3 ntextreme3 commented Mar 17, 2022

Changes

Fixes #680

This is a rebase of #682 -- see #682 (comment)

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

@ntextreme3
Copy link
Author

@ods Do you have suggestions on the types of tests you'd like to see here. I haven't played with this yet, just rebased it. But I'll need this functionality in the coming months so at some point I'll be going through it more thoroughly anyway.

@lgtm-com
Copy link

lgtm-com bot commented Mar 17, 2022

This pull request introduces 1 alert when merging 83e882a into 2c54e10 - view on LGTM.com

new alerts:

  • 1 for Signature mismatch in overriding method

@@ -587,6 +588,7 @@ def _check_api_version_response(self, response):
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), SyncGroupRequest[3]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is used to determine broker version. One feature is enough for each version, so this line is excessive.

@codecov
Copy link

codecov bot commented Mar 18, 2022

Codecov Report

Merging #827 (83e882a) into master (2c54e10) will decrease coverage by 47.35%.
The diff coverage is 47.67%.

@@             Coverage Diff             @@
##           master     #827       +/-   ##
===========================================
- Coverage   96.96%   49.60%   -47.36%     
===========================================
  Files          30       32        +2     
  Lines        5434     5497       +63     
===========================================
- Hits         5269     2727     -2542     
- Misses        165     2770     +2605     
Flag Coverage Δ
cext 40.60% <47.67%> (-47.14%) ⬇️
integration ?
purepy 49.15% <47.67%> (-47.35%) ⬇️
unit 49.60% <47.67%> (+0.12%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
aiokafka/consumer/consumer.py 16.36% <0.00%> (-80.35%) ⬇️
aiokafka/consumer/subscription_state.py 81.36% <ø> (-18.64%) ⬇️
aiokafka/consumer/group_coordinator.py 9.70% <8.51%> (-89.20%) ⬇️
aiokafka/consumer/assignors.py 83.33% <83.33%> (ø)
aiokafka/client.py 27.65% <100.00%> (-68.99%) ⬇️
aiokafka/errors.py 99.32% <100.00%> (-0.68%) ⬇️
aiokafka/protocol/group.py 100.00% <100.00%> (ø)
aiokafka/producer/sender.py 14.44% <0.00%> (-81.84%) ⬇️
... and 13 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2c54e10...83e882a. Read the comment docs.

@ods
Copy link
Collaborator

ods commented Mar 18, 2022

Do you have suggestions on the types of tests you'd like to see here.

We need to proof, that this new functionality works. There must be some code with static partition assignor.

@theY4Kman
Copy link

I played around with testing this feature last year. I didn't make it very far, but I thought an important part would be the querying of consumers' group instance IDs – and for that, a v4 DescribeGroups req/res is necessary. kafka-python still does not have this record version. I started building it out in my branch:

# aiokafka/protocol/admin.py

from kafka.protocol.admin import DescribeGroupsRequest_v3, DescribeGroupsResponse_v4
from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String


class DescribeGroupsResponse_v4(Response):
    API_KEY = 15
    API_VERSION = 4
    SCHEMA = Schema(
        ('throttle_time_ms', Int32),
        ('groups', Array(
            ('error_code', Int16),
            ('group', String('utf-8')),
            ('state', String('utf-8')),
            ('protocol_type', String('utf-8')),
            ('protocol', String('utf-8')),
            ('members', Array(
                ('member_id', String('utf-8')),
                ('group_instance_id', String('utf-8')),
                ('client_id', String('utf-8')),
                ('client_host', String('utf-8')),
                ('member_metadata', Bytes),
                ('member_assignment', Bytes)))),
            ('authorized_operations', Int32))
    )


class DescribeGroupsRequest_v4(Request):
    API_KEY = 15
    API_VERSION = 4
    RESPONSE_TYPE = DescribeGroupsResponse_v4
    SCHEMA = DescribeGroupsRequest_v3.SCHEMA

@ods
Copy link
Collaborator

ods commented Mar 14, 2023

We may consider removing kafka-python from dependencies, as even issues with PRs are not merged into master.

@rujutashinde
Copy link

Hello @ods , Is there a plan for implementing this change? I am using aiokafka 0.8.0, and i would like to see this feature as i am seeing issues with consumer re-balancing and the KIP-345 would be helpful in resolving that.

@ods
Copy link
Collaborator

ods commented Jun 6, 2023

Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?

@rujutashinde
Copy link

Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?

I see. Unfortunately i dont have the expertise either, i am more a user of the project. But i will give it a go when i have some time cycles. Please do update the ticket if there are any new developments to this . appreciate the quick response!

@tartieret
Copy link

Hi,
I am wondering if there is anything we could do to finalize and merge this PR, as this is a super useful feature?
In case you haven't noticed and if it could be useful, the Robinhood folks implemented this in their fork of aiokafka: https://github.com/robinhood/aiokafka/pull/21/files
Let me know if any help is required

@ods
Copy link
Collaborator

ods commented Oct 15, 2023

Hi @tartieret,
Could you please provide tests for it?

@joshuaherrera
Copy link
Contributor

joshuaherrera commented Oct 27, 2023

Hello! I'd love to help get this change merged into the project, my company would benefit greatly from KIP-345 mode.

I'll work on the requested tests. Are tests all that is preventing this from getting merged?

@ods
Copy link
Collaborator

ods commented Oct 28, 2023

I'll work on the requested tests. Are tests all that is preventing this from getting merged?

Yes, tests for it is the most important part. There is also some work to be done to resolve conflicts with merge of kafka-python's code, but this part I could probably do myself.

ods added a commit that referenced this pull request Dec 7, 2023
* Implement KIP-345 in aiokafka

* fixing linting errors

* fixing linting errors

* fixing linting errors

* Update tests.yml

* Fix linting errors

* Linting fixed, tests still failing

* fixed tests.

* Undoing a lot of linting

* last few lints

* Update assignors.py

suppressing lgtm warning

* fix linting

* fix lgtm exception

* fix trailing space

* add KIP-345 tests, remove broker version check

* use aiokafka AbstractPartitionAssignor

* only test KIP-345 mode with valid Kafka versions

* Update aiokafka/consumer/group_coordinator.py

Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>

* remove AbstractStaticPartitionAssignor

* refactor _perform_assignment to use a JoinGroupResponse class as it's argument

* poll periodically in kip-345 tests

* update tests to use async_timeout

* update isinstance function, use JoinGroupResponse_v5 for check

---------

Co-authored-by: Vikram Patki 24489 <vpatki@wayfair.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
Co-authored-by: g-clef <spamblock@g-clef.net>
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
@ods
Copy link
Collaborator

ods commented Dec 7, 2023

Replaced with #941

@ods ods closed this Dec 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for static membership protocol
9 participants