diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 77f0fec9..bc16e3bf 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -18,12 +18,12 @@ jobs: python-version: "3.8" - name: Prepare C files to include run: | - python -m pip install --upgrade pip setuptools + python -m pip install --upgrade pip build python -m pip install -r requirements-cython.txt # Make sure we install to have all c files to be shiped with bundle python -m pip install -vv -U . # We set -vv to see compiler exceptions/warnings - name: Build source package - run: python setup.py sdist + run: python -m build --sdist - name: Upload source package uses: actions/upload-artifact@v2 with: @@ -51,14 +51,14 @@ jobs: - name: Build wheels env: CIBW_ARCHS_LINUX: ${{matrix.arch}} - CIBW_BUILD: cp38-* cp39-* cp310-* cp311-* + CIBW_BUILD: cp38-* cp39-* cp310-* cp311-* cp312-* CIBW_SKIP: '*-musllinux*' CIBW_BEFORE_BUILD_LINUX: pip install -r requirements-cython.txt && yum install -y zlib-devel # On windows and mac we should have z library preinstalled CIBW_BEFORE_BUILD: pip install -r requirements-cython.txt CIBW_BUILD_VERBOSITY: 2 run: | - python -m pip install --upgrade pip setuptools + python -m pip install --upgrade pip pip install cibuildwheel cibuildwheel --output-dir dist shell: bash @@ -74,8 +74,7 @@ jobs: strategy: matrix: - # 3.11 excluded due to problems with python-snappy - python: ["3.8", "3.9", "3.10"] + python: ["3.8", "3.9", "3.10", "3.11", "3.12"] include: - python: "3.8" aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl @@ -83,6 +82,10 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp39-cp39-win_amd64.whl - python: "3.10" aiokafka_whl: dist/aiokafka-*-cp310-cp310-win_amd64.whl + - python: "3.11" + aiokafka_whl: dist/aiokafka-*-cp311-cp311-win_amd64.whl + - python: "3.12" + aiokafka_whl: dist/aiokafka-*-cp312-cp312-win_amd64.whl steps: - uses: actions/checkout@v2 @@ -116,7 +119,7 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11"] + python: ["3.8", "3.9", "3.10", "3.11", "3.12"] include: - python: "3.8" aiokafka_whl: dist/aiokafka-*-cp38-cp38-macosx_10_9_x86_64.whl @@ -126,6 +129,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp310-cp310-macosx_10_9_x86_64.whl - python: "3.11" aiokafka_whl: dist/aiokafka-*-cp311-cp311-macosx_10_9_x86_64.whl + - python: "3.12" + aiokafka_whl: dist/aiokafka-*-cp312-cp312-macosx_10_9_x86_64.whl steps: - uses: actions/checkout@v2 @@ -139,9 +144,6 @@ jobs: with: python-version: ${{ matrix.python }} - - name: Install system dependencies - run: | - brew install snappy - name: Install python dependencies run: | pip install --upgrade pip setuptools wheel @@ -160,7 +162,7 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11"] + python: ["3.8", "3.9", "3.10", "3.11", "3.12"] include: - python: "3.8" aiokafka_whl: dist/aiokafka-*-cp38-cp38-manylinux*_x86_64.whl @@ -170,6 +172,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp310-cp310-manylinux*_x86_64.whl - python: "3.11" aiokafka_whl: dist/aiokafka-*-cp311-cp311-manylinux*_x86_64.whl + - python: "3.12" + aiokafka_whl: dist/aiokafka-*-cp312-cp312-manylinux*_x86_64.whl steps: - uses: actions/checkout@v2 @@ -217,6 +221,8 @@ jobs: aiokafka_whl: dist/aiokafka-*-cp310-cp310-manylinux*_aarch64.whl - pyver: cp311-cp311 aiokafka_whl: dist/aiokafka-*-cp311-cp311-manylinux*_aarch64.whl + - pyver: cp312-cp312 + aiokafka_whl: dist/aiokafka-*-cp312-cp312-manylinux*_aarch64.whl steps: - uses: actions/checkout@v2 @@ -236,7 +242,7 @@ jobs: source .env/bin/activate && \ yum install -y epel-release && \ yum-config-manager --enable epel && \ - yum install -y snappy-devel libzstd-devel krb5-devel && \ + yum install -y krb5-devel && \ pip install --upgrade pip setuptools wheel && \ pip install -r requirements-ci.txt && \ pip install ${{ matrix.aiokafka_whl }} && \ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e10b33c3..b2a77f51 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -73,8 +73,7 @@ jobs: strategy: matrix: - # 3.11 excluded due to problems with python-snappy - python: ["3.8", "3.9", "3.10"] + python: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v2 @@ -142,7 +141,7 @@ jobs: strategy: matrix: - python: ["3.8", "3.9", "3.10", "3.11"] + python: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v2 @@ -168,10 +167,6 @@ jobs: restore-keys: | ${{ runner.os }}-py-${{ matrix.python }}- - - name: Install system dependencies - run: | - brew install snappy - - name: Install python dependencies run: | pip install --upgrade pip setuptools wheel @@ -215,7 +210,7 @@ jobs: strategy: matrix: include: - - python: "3.11" + - python: "3.12" kafka: "2.8.1" scala: "2.13" @@ -229,39 +224,42 @@ jobs: - python: "3.10" kafka: "2.8.1" scala: "2.13" + - python: "3.11" + kafka: "2.8.1" + scala: "2.13" # Older brokers against latest python version - - python: "3.11" + - python: "3.12" kafka: "0.9.0.1" scala: "2.11" - - python: "3.11" + - python: "3.12" kafka: "0.10.2.1" scala: "2.11" - - python: "3.11" + - python: "3.12" kafka: "0.11.0.3" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "1.1.1" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.1.1" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.2.2" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.3.1" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.4.1" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.5.1" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.6.3" scala: "2.12" - - python: "3.11" + - python: "3.12" kafka: "2.7.2" scala: "2.13" fail-fast: false diff --git a/CHANGES.rst b/CHANGES.rst index 839244c2..4cb7da09 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,27 @@ Changelog ========= +0.9.0 (????-??-??) +================== + +New features: + +* Include `kafka-python` into `aiokafka`'s code base +* Replace `python-snappy` and `zstandard` with `cramjam` +* PEP518 compliant `pyproject.toml` +* Python 3.12 support + + +Bugfixes: + +* Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) +* Improve send performance (issue #943) + + +Improved Documentation: + +* Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov) + 0.8.1 (2023-05-31) ================== diff --git a/aiokafka/__init__.py b/aiokafka/__init__.py index adaa69c6..e9d4706b 100644 --- a/aiokafka/__init__.py +++ b/aiokafka/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.8.1' # noqa +__version__ = '0.9.0.rc1' # noqa from .abc import ConsumerRebalanceListener from .client import AIOKafkaClient diff --git a/aiokafka/client.py b/aiokafka/client.py index 9ecd23cd..26436fec 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -626,8 +626,9 @@ async def _wait_on_metadata(self, topic): UnknownTopicOrPartitionError: if no topic or partitions found in cluster metadata """ - if topic in self.cluster.topics(): - return self.cluster.partitions_for_topic(topic) + partitions = self.cluster.partitions_for_topic(topic) + if partitions is not None: + return partitions # add topic to metadata topic list if it is not there already. self.add_topic(topic) @@ -635,16 +636,15 @@ async def _wait_on_metadata(self, topic): t0 = time.monotonic() while True: await self.force_metadata_update() - if topic in self.cluster.topics(): - break + partitions = self.cluster.partitions_for_topic(topic) + if partitions is not None: + return partitions if (time.monotonic() - t0) > (self._request_timeout_ms / 1000): raise UnknownTopicOrPartitionError() if topic in self.cluster.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) await asyncio.sleep(self._retry_backoff) - return self.cluster.partitions_for_topic(topic) - async def _maybe_wait_metadata(self): if self._md_update_fut is not None: await asyncio.shield(self._md_update_fut) diff --git a/aiokafka/codec.py b/aiokafka/codec.py index 2e3ddaaf..aab1975f 100644 --- a/aiokafka/codec.py +++ b/aiokafka/codec.py @@ -1,6 +1,5 @@ import gzip import io -import platform import struct _XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1) @@ -8,28 +7,18 @@ ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: - import snappy + import cramjam except ImportError: - snappy = None - -try: - import zstandard as zstd -except ImportError: - zstd = None + cramjam = None try: import lz4.frame as lz4 def _lz4_compress(payload, **kwargs): # Kafka does not support LZ4 dependent blocks - try: - # For lz4>=0.12.0 - kwargs.pop("block_linked", None) - return lz4.compress(payload, block_linked=False, **kwargs) - except TypeError: - # For earlier versions of lz4 - kwargs.pop("block_mode", None) - return lz4.compress(payload, block_mode=1, **kwargs) + # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing + kwargs.pop("block_linked", None) + return lz4.compress(payload, block_linked=False, **kwargs) except ImportError: lz4 = None @@ -44,24 +33,17 @@ def _lz4_compress(payload, **kwargs): except ImportError: lz4framed = None -try: - import xxhash -except ImportError: - xxhash = None - -PYPY = bool(platform.python_implementation() == "PyPy") - def has_gzip(): return True def has_snappy(): - return snappy is not None + return cramjam is not None def has_zstd(): - return zstd is not None + return cramjam is not None def has_lz4(): @@ -133,32 +115,22 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32 * 1024): raise NotImplementedError("Snappy codec is not available") if not xerial_compatible: - return snappy.compress(payload) + return cramjam.snappy.compress_raw(payload) out = io.BytesIO() for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER): out.write(struct.pack("!" + fmt, dat)) # Chunk through buffers to avoid creating intermediate slice copies - if PYPY: - # on pypy, snappy.compress() on a sliced buffer consumes the entire - # buffer... likely a python-snappy bug, so just use a slice copy - def chunker(payload, i, size): - return payload[i:size + i] - - else: - # snappy.compress does not like raw memoryviews, so we have to convert - # tobytes, which is a copy... oh well. it's the thought that counts. - # pylint: disable-msg=undefined-variable - def chunker(payload, i, size): - return memoryview(payload)[i:size + i].tobytes() + def chunker(payload, i, size): + return memoryview(payload)[i:size + i] for chunk in ( chunker(payload, i, xerial_blocksize) for i in range(0, len(payload), xerial_blocksize) ): - block = snappy.compress(chunk) + block = cramjam.snappy.compress_raw(chunk) block_size = len(block) out.write(struct.pack("!i", block_size)) out.write(block) @@ -210,13 +182,13 @@ def snappy_decode(payload): # Skip the block size cursor += 4 end = cursor + block_size - out.write(snappy.decompress(byt[cursor:end])) + out.write(cramjam.snappy.decompress_raw(byt[cursor:end])) cursor = end out.seek(0) return out.read() else: - return snappy.decompress(payload) + return bytes(cramjam.snappy.decompress_raw(payload)) if lz4: @@ -253,66 +225,20 @@ def lz4f_decode(payload): lz4_decode = None -def lz4_encode_old_kafka(payload): - """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum.""" - assert xxhash is not None - data = lz4_encode(payload) - header_size = 7 - flg = data[4] - if not isinstance(flg, int): - flg = ord(flg) - - content_size_bit = (flg >> 3) & 1 - if content_size_bit: - # Old kafka does not accept the content-size field - # so we need to discard it and reset the header flag - flg -= 8 - data = bytearray(data) - data[4] = flg - data = bytes(data) - payload = data[header_size + 8:] - else: - payload = data[header_size:] - - # This is the incorrect hc - hc = xxhash.xxh32(data[0:header_size - 1]).digest()[ - -2:-1 - ] # pylint: disable-msg=no-member - - return b"".join([data[0:header_size - 1], hc, payload]) - - -def lz4_decode_old_kafka(payload): - assert xxhash is not None - # Kafka's LZ4 code has a bug in its header checksum implementation - header_size = 7 - if isinstance(payload[4], int): - flg = payload[4] - else: - flg = ord(payload[4]) - content_size_bit = (flg >> 3) & 1 - if content_size_bit: - header_size += 8 - - # This should be the correct hc - hc = xxhash.xxh32(payload[4:header_size - 1]).digest()[-2:-1] - - munged_payload = b"".join([payload[0:header_size - 1], hc, payload[header_size:]]) - return lz4_decode(munged_payload) +def zstd_encode(payload, level=None): + if not has_zstd(): + raise NotImplementedError("Zstd codec is not available") + if level is None: + # Default for kafka broker + # https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level + level = 3 -def zstd_encode(payload): - if not zstd: - raise NotImplementedError("Zstd codec is not available") - return zstd.ZstdCompressor().compress(payload) + return bytes(cramjam.zstd.compress(payload, level=level)) def zstd_decode(payload): - if not zstd: + if not has_zstd(): raise NotImplementedError("Zstd codec is not available") - try: - return zstd.ZstdDecompressor().decompress(payload) - except zstd.ZstdError: - return zstd.ZstdDecompressor().decompress( - payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE - ) + + return bytes(cramjam.zstd.decompress(payload)) diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 63bc496d..bbf1f75e 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -215,7 +215,7 @@ class AIOKafkaConsumer: sasl_plain_password (str): password for SASL ``PLAIN`` authentication. Default: None sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider): - OAuthBearer token provider instance. (See :mod:`aiokafka.oauth`). + OAuthBearer token provider instance. Default: None Note: diff --git a/aiokafka/coordinator/base.py b/aiokafka/coordinator/base.py deleted file mode 100644 index 823720bf..00000000 --- a/aiokafka/coordinator/base.py +++ /dev/null @@ -1,1190 +0,0 @@ -import abc -import copy -import logging -import threading -import time -import weakref -from concurrent.futures import Future - -from aiokafka import errors as Errors -from aiokafka.metrics import AnonMeasurable -from aiokafka.metrics.stats import Avg, Count, Max, Rate -from aiokafka.protocol.api import Response -from aiokafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from aiokafka.protocol.group import ( - HeartbeatRequest, - JoinGroupRequest, - LeaveGroupRequest, - SyncGroupRequest, -) - -from .heartbeat import Heartbeat - -log = logging.getLogger("aiokafka.coordinator") - - -class MemberState(object): - UNJOINED = "" # the client is not part of a group - REBALANCING = "" # the client has begun rebalancing - STABLE = "" # the client has joined and is sending heartbeats - - -class Generation(object): - def __init__(self, generation_id, member_id, protocol): - self.generation_id = generation_id - self.member_id = member_id - self.protocol = protocol - - -Generation.NO_GENERATION = Generation( - OffsetCommitRequest[2].DEFAULT_GENERATION_ID, - JoinGroupRequest[0].UNKNOWN_MEMBER_ID, - None, -) - - -class UnjoinedGroupException(Errors.KafkaError): - retriable = True - - -class BaseCoordinator(object): - """ - BaseCoordinator implements group management for a single group member - by interacting with a designated Kafka broker (the coordinator). Group - semantics are provided by extending this class. See ConsumerCoordinator - for example usage. - - From a high level, Kafka's group management protocol consists of the - following sequence of actions: - - 1. Group Registration: Group members register with the coordinator providing - their own metadata (such as the set of topics they are interested in). - - 2. Group/Leader Selection: The coordinator select the members of the group - and chooses one member as the leader. - - 3. State Assignment: The leader collects the metadata from all the members - of the group and assigns state. - - 4. Group Stabilization: Each member receives the state assigned by the - leader and begins processing. - - To leverage this protocol, an implementation must define the format of - metadata provided by each member for group registration in - :meth:`.group_protocols` and the format of the state assignment provided by - the leader in :meth:`._perform_assignment` and which becomes available to - members in :meth:`._on_join_complete`. - - Note on locking: this class shares state between the caller and a background - thread which is used for sending heartbeats after the client has joined the - group. All mutable state as well as state transitions are protected with the - class's monitor. Generally this means acquiring the lock before reading or - writing the state of the group (e.g. generation, member_id) and holding the - lock when sending a request that affects the state of the group - (e.g. JoinGroup, LeaveGroup). - """ - - DEFAULT_CONFIG = { - "group_id": "kafka-python-default-group", - "session_timeout_ms": 10000, - "heartbeat_interval_ms": 3000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - "api_version": (0, 10, 1), - "metric_group_prefix": "", - } - - def __init__(self, client, metrics, **configs): - """ - Keyword Arguments: - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - """ - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - if self.config["api_version"] < (0, 10, 1): - if self.config["max_poll_interval_ms"] != self.config["session_timeout_ms"]: - raise Errors.KafkaConfigurationError( - "Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms" - ) - - self._client = client - self.group_id = self.config["group_id"] - self.heartbeat = Heartbeat(**self.config) - self._heartbeat_thread = None - self._lock = threading.Condition() - self.rejoin_needed = True - self.rejoining = False # renamed / complement of java needsJoinPrepare - self.state = MemberState.UNJOINED - self.join_future = None - self.coordinator_id = None - self._find_coordinator_future = None - self._generation = Generation.NO_GENERATION - self.sensors = GroupCoordinatorMetrics( - self.heartbeat, metrics, self.config["metric_group_prefix"] - ) - - @abc.abstractmethod - def protocol_type(self): - """ - Unique identifier for the class of supported protocols - (e.g. "consumer" or "connect"). - - Returns: - str: protocol type name - """ - pass - - @abc.abstractmethod - def group_protocols(self): - """Return the list of supported group protocols and metadata. - - This list is submitted by each group member via a JoinGroupRequest. - The order of the protocols in the list indicates the preference of the - protocol (the first entry is the most preferred). The coordinator takes - this preference into account when selecting the generation protocol - (generally more preferred protocols will be selected as long as all - members support them and there is no disagreement on the preference). - - Note: metadata must be type bytes or support an encode() method - - Returns: - list: [(protocol, metadata), ...] - """ - pass - - @abc.abstractmethod - def _on_join_prepare(self, generation, member_id): - """Invoked prior to each group join or rejoin. - - This is typically used to perform any cleanup from the previous - generation (such as committing offsets for the consumer) - - Arguments: - generation (int): The previous generation or -1 if there was none - member_id (str): The identifier of this member in the previous group - or '' if there was none - """ - pass - - @abc.abstractmethod - def _perform_assignment(self, response: Response): - """Perform assignment for the group. - - This is used by the leader to push state to all the members of the group - (e.g. to push partition assignments in the case of the new consumer) - - Arguments: - response (Response): A JoinGroupResponse class that implements the - Response abstract class. The following attributes of this class - are used within the function: - protocol (str): the chosen group protocol (assignment strategy) - members (list): [(member_id, metadata_bytes)] from - JoinGroupResponse. metadata_bytes are associated with - the chosen group protocol, and the Coordinator subclass is - responsible for decoding metadata_bytes based on that protocol. - - Returns: - dict: {member_id: assignment}; assignment must either be bytes - or have an encode() method to convert to bytes - """ - pass - - @abc.abstractmethod - def _on_join_complete( - self, generation, member_id, protocol, member_assignment_bytes - ): - """Invoked when a group member has successfully joined a group. - - Arguments: - generation (int): the generation that was joined - member_id (str): the identifier for the local member in the group - protocol (str): the protocol selected by the coordinator - member_assignment_bytes (bytes): the protocol-encoded assignment - propagated from the group leader. The Coordinator instance is - responsible for decoding based on the chosen protocol. - """ - pass - - def coordinator_unknown(self): - """Check if we know who the coordinator is and have an active connection - - Side-effect: reset coordinator_id to None if connection failed - - Returns: - bool: True if the coordinator is unknown - """ - return self.coordinator() is None - - def coordinator(self): - """Get the current coordinator - - Returns: the current coordinator id or None if it is unknown - """ - if self.coordinator_id is None: - return None - elif self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead("Node Disconnected") - return None - else: - return self.coordinator_id - - def ensure_coordinator_ready(self): - """Block until the coordinator for this group is known - (and we have an active connection -- java client uses unsent queue). - """ - with self._client._lock, self._lock: - while self.coordinator_unknown(): - - # Prior to 0.8.2 there was no group coordinator - # so we will just pick a node at random and treat - # it as the "coordinator" - if self.config["api_version"] < (0, 8, 2): - self.coordinator_id = self._client.least_loaded_node() - if self.coordinator_id is not None: - self._client.maybe_connect(self.coordinator_id) - continue - - future = self.lookup_coordinator() - self._client.poll(future=future) - - if future.failed(): - if future.retriable(): - if getattr(future.exception, "invalid_metadata", False): - log.debug( - "Requesting metadata for group coordinator request: %s", - future.exception, - ) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - else: - time.sleep(self.config["retry_backoff_ms"] / 1000) - else: - raise future.exception # pylint: disable-msg=raising-bad-type - - def _reset_find_coordinator_future(self, result): - self._find_coordinator_future = None - - def lookup_coordinator(self): - with self._lock: - if self._find_coordinator_future is not None: - return self._find_coordinator_future - - # If there is an error sending the group coordinator request - # then _reset_find_coordinator_future will immediately fire and - # set _find_coordinator_future = None - # To avoid returning None, we capture the future in a local variable - future = self._send_group_coordinator_request() - self._find_coordinator_future = future - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return future - - def need_rejoin(self): - """Check whether the group should be rejoined (e.g. if metadata changes) - - Returns: - bool: True if it should, False otherwise - """ - return self.rejoin_needed - - def poll_heartbeat(self): - """ - Check the status of the heartbeat thread (if it is active) and indicate - the liveness of the client. This must be called periodically after - joining with :meth:`.ensure_active_group` to ensure that the member stays - in the group. If an interval of time longer than the provided rebalance - timeout (max_poll_interval_ms) expires without calling this method, then - the client will proactively leave the group. - - Raises: RuntimeError for unexpected errors raised from the heartbeat thread - """ - with self._lock: - if self._heartbeat_thread is not None: - if self._heartbeat_thread.failed: - # set the heartbeat thread to None and raise an exception. - # If the user catches it, the next call to ensure_active_group() - # will spawn a new heartbeat thread. - cause = self._heartbeat_thread.failed - self._heartbeat_thread = None - raise cause # pylint: disable-msg=raising-bad-type - - # Awake the heartbeat thread if needed - if self.heartbeat.should_heartbeat(): - self._lock.notify() - self.heartbeat.poll() - - def time_to_next_heartbeat(self): - """Returns seconds (float) remaining before next heartbeat should be sent - - Note: Returns infinite if group is not joined - """ - with self._lock: - # if we have not joined the group, we don't need to send heartbeats - if self.state is MemberState.UNJOINED: - return float("inf") - return self.heartbeat.time_to_next_heartbeat() - - def _handle_join_success(self, member_assignment_bytes): - with self._lock: - log.info( - "Successfully joined group %s with generation %s", - self.group_id, - self._generation.generation_id, - ) - self.state = MemberState.STABLE - self.rejoin_needed = False - if self._heartbeat_thread: - self._heartbeat_thread.enable() - - def _handle_join_failure(self, _): - with self._lock: - self.state = MemberState.UNJOINED - - def ensure_active_group(self): - """Ensure that the group is active (i.e. joined and synced)""" - with self._client._lock, self._lock: - if self._heartbeat_thread is None: - self._start_heartbeat_thread() - - while self.need_rejoin() or self._rejoin_incomplete(): - self.ensure_coordinator_ready() - - # call on_join_prepare if needed. We set a flag - # to make sure that we do not call it a second - # time if the client is woken up before a pending - # rebalance completes. This must be called on each - # iteration of the loop because an event requiring - # a rebalance (such as a metadata refresh which - # changes the matched subscription set) can occur - # while another rebalance is still in progress. - if not self.rejoining: - self._on_join_prepare( - self._generation.generation_id, self._generation.member_id - ) - self.rejoining = True - - # ensure that there are no pending requests to the coordinator. - # This is important in particular to avoid resending a pending - # JoinGroup request. - while not self.coordinator_unknown(): - if not self._client.in_flight_request_count(self.coordinator_id): - break - self._client.poll() - else: - continue - - # we store the join future in case we are woken up by the user - # after beginning the rebalance in the call to poll below. - # This ensures that we do not mistakenly attempt to rejoin - # before the pending rebalance has completed. - if self.join_future is None: - # Fence off the heartbeat thread explicitly so that it cannot - # interfere with the join group. Note that this must come after - # the call to _on_join_prepare since we must be able to continue - # sending heartbeats if that callback takes some time. - self._heartbeat_thread.disable() - - self.state = MemberState.REBALANCING - future = self._send_join_group_request() - - self.join_future = ( - future # this should happen before adding callbacks - ) - - # handle join completion in the callback so that the - # callback will be invoked even if the consumer is woken up - # before finishing the rebalance - future.add_callback(self._handle_join_success) - - # we handle failures below after the request finishes. - # If the join completes after having been woken up, the - # exception is ignored and we will rejoin - future.add_errback(self._handle_join_failure) - - else: - future = self.join_future - - self._client.poll(future=future) - - if future.succeeded(): - self._on_join_complete( - self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - future.value, - ) - self.join_future = None - self.rejoining = False - - else: - self.join_future = None - exception = future.exception - if isinstance( - exception, - ( - Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError, - ), - ): - continue - elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def _rejoin_incomplete(self): - return self.join_future is not None - - def _send_join_group_request(self): - """Join the group and return the assignment for the next generation. - - This function handles both JoinGroup and SyncGroup, delegating to - :meth:`._perform_assignment` if elected leader by the coordinator. - - Returns: - Future: resolves to the encoded-bytes assignment returned from the - group leader - """ - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - elif not self._client.ready(self.coordinator_id, metadata_priority=False): - e = Errors.NodeNotReadyError(self.coordinator_id) - return Future().failure(e) - - # send a join group request to the coordinator - log.info("(Re-)joining group %s", self.group_id) - member_metadata = [ - (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) - for protocol, metadata in self.group_protocols() - ] - if self.config["api_version"] < (0, 9): - raise Errors.KafkaError("JoinGroupRequest api requires 0.9+ brokers") - elif (0, 9) <= self.config["api_version"] < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self.config["session_timeout_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - elif (0, 10, 1) <= self.config["api_version"] < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self.config["session_timeout_ms"], - self.config["max_poll_interval_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - else: - request = JoinGroupRequest[2]( - self.group_id, - self.config["session_timeout_ms"], - self.config["max_poll_interval_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - - # create the request for the coordinator - log.debug( - "Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id - ) - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _failed_request(self, node_id, request, future, error): - # Marking coordinator dead - # unless the error is caused by internal client pipelining - if not isinstance( - error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests) - ): - log.error( - "Error sending %s to node %s [%s]", - request.__class__.__name__, - node_id, - error, - ) - self.coordinator_dead(error) - else: - log.debug( - "Error sending %s to node %s [%s]", - request.__class__.__name__, - node_id, - error, - ) - future.failure(error) - - def _handle_join_group_response(self, future, send_time, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "Received successful JoinGroup response for group %s: %s", - self.group_id, - response, - ) - self.sensors.join_latency.record((time.time() - send_time) * 1000) - with self._lock: - if self.state is not MemberState.REBALANCING: - # if the consumer was woken up before a rebalance completes, - # we may have already left the group. In this case, we do - # not want to continue with the sync group. - future.failure(UnjoinedGroupException()) - else: - self._generation = Generation( - response.generation_id, - response.member_id, - response.group_protocol, - ) - - if response.leader_id == response.member_id: - log.info( - "Elected group leader -- performing partition" - " assignments using %s", - self._generation.protocol, - ) - self._on_join_leader(response).chain(future) - else: - self._on_join_follower().chain(future) - - elif error_type is Errors.GroupLoadInProgressError: - log.debug( - "Attempt to join group %s rejected since coordinator %s" - " is loading the group.", - self.group_id, - self.coordinator_id, - ) - # backoff and retry - future.failure(error_type(response)) - elif error_type is Errors.UnknownMemberIdError: - # reset the member id and retry immediately - error = error_type(self._generation.member_id) - self.reset_generation() - log.debug( - "Attempt to join group %s failed due to unknown member id", - self.group_id, - ) - future.failure(error) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - # re-discover the coordinator and retry with backoff - self.coordinator_dead(error_type()) - log.debug( - "Attempt to join group %s failed due to obsolete " - "coordinator information: %s", - self.group_id, - error_type.__name__, - ) - future.failure(error_type()) - elif error_type in ( - Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError, - ): - # log the error and re-throw the exception - error = error_type(response) - log.error( - "Attempt to join group %s failed due to fatal error: %s", - self.group_id, - error, - ) - future.failure(error) - elif error_type is Errors.GroupAuthorizationFailedError: - future.failure(error_type(self.group_id)) - else: - # unexpected error, throw the exception - error = error_type() - log.error("Unexpected error in join group response: %s", error) - future.failure(error) - - def _on_join_follower(self): - # send follower's sync group with an empty assignment - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - {}, - ) - log.debug( - "Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) - return self._send_sync_group_request(request) - - def _on_join_leader(self, response): - """ - Perform leader synchronization and send back the assignment - for the group via SyncGroupRequest - - Arguments: - response (JoinResponse): broker response to parse - - Returns: - Future: resolves to member assignment encoded-bytes - """ - try: - group_assignment = self._perform_assignment(response) - except Exception as e: - return Future().failure(e) - - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - [ - ( - member_id, - assignment - if isinstance(assignment, bytes) - else assignment.encode(), - ) - for member_id, assignment in group_assignment.items() - ], - ) - - log.debug( - "Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) - return self._send_sync_group_request(request) - - def _send_sync_group_request(self, request): - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - # We assume that coordinator is ready if we're sending SyncGroup - # as it typically follows a successful JoinGroup - # Also note that if client.ready() enforces a metadata priority policy, - # we can get into an infinite loop if the leader assignment process - # itself requests a metadata update - - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _handle_sync_group_response(self, future, send_time, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - self.sensors.sync_latency.record((time.time() - send_time) * 1000) - future.success(response.member_assignment) - return - - # Always rejoin on error - self.request_rejoin() - if error_type is Errors.GroupAuthorizationFailedError: - future.failure(error_type(self.group_id)) - elif error_type is Errors.RebalanceInProgressError: - log.debug( - "SyncGroup for group %s failed due to coordinator" " rebalance", - self.group_id, - ) - future.failure(error_type(self.group_id)) - elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): - error = error_type() - log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.reset_generation() - future.failure(error) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - error = error_type() - log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.coordinator_dead(error) - future.failure(error) - else: - error = error_type() - log.error("Unexpected error from SyncGroup: %s", error) - future.failure(error) - - def _send_group_coordinator_request(self): - """Discover the current coordinator for the group. - - Returns: - Future: resolves to the node id of the coordinator - """ - node_id = self._client.least_loaded_node() - if node_id is None: - return Future().failure(Errors.NoBrokersAvailable()) - - elif not self._client.ready(node_id, metadata_priority=False): - e = Errors.NodeNotReadyError(node_id) - return Future().failure(e) - - log.debug( - "Sending group coordinator request for group %s to broker %s", - self.group_id, - node_id, - ) - request = GroupCoordinatorRequest[0](self.group_id) - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback(self._handle_group_coordinator_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_group_coordinator_response(self, future, response): - log.debug("Received group coordinator response %s", response) - - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - with self._lock: - coordinator_id = self._client.cluster.add_group_coordinator( - self.group_id, response - ) - if not coordinator_id: - # This could happen if coordinator metadata is different - # than broker metadata - future.failure(Errors.IllegalStateError()) - return - - self.coordinator_id = coordinator_id - log.info( - "Discovered coordinator %s for group %s", - self.coordinator_id, - self.group_id, - ) - self._client.maybe_connect(self.coordinator_id) - self.heartbeat.reset_timeouts() - future.success(self.coordinator_id) - - elif error_type is Errors.GroupCoordinatorNotAvailableError: - log.debug("Group Coordinator Not Available; retry") - future.failure(error_type()) - elif error_type is Errors.GroupAuthorizationFailedError: - error = error_type(self.group_id) - log.error("Group Coordinator Request failed: %s", error) - future.failure(error) - else: - error = error_type() - log.error( - "Group coordinator lookup for group %s failed: %s", self.group_id, error - ) - future.failure(error) - - def coordinator_dead(self, error): - """Mark the current coordinator as dead.""" - if self.coordinator_id is not None: - log.warning( - "Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, - self.group_id, - error, - ) - self.coordinator_id = None - - def generation(self): - """Get the current generation state if the group is stable. - - Returns: the current generation or None if the group is unjoined/rebalancing - """ - with self._lock: - if self.state is not MemberState.STABLE: - return None - return self._generation - - def reset_generation(self): - """Reset the generation and memberId because we have fallen out of the group.""" - with self._lock: - self._generation = Generation.NO_GENERATION - self.rejoin_needed = True - self.state = MemberState.UNJOINED - - def request_rejoin(self): - self.rejoin_needed = True - - def _start_heartbeat_thread(self): - if self._heartbeat_thread is None: - log.info("Starting new heartbeat thread") - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() - - def _close_heartbeat_thread(self): - if self._heartbeat_thread is not None: - log.info("Stopping heartbeat thread") - try: - self._heartbeat_thread.close() - except ReferenceError: - pass - self._heartbeat_thread = None - - def __del__(self): - self._close_heartbeat_thread() - - def close(self): - """Close the coordinator, leave the current group, - and reset local generation / member_id""" - self._close_heartbeat_thread() - self.maybe_leave_group() - - def maybe_leave_group(self): - """Leave the current group and reset local generation/memberId.""" - with self._client._lock, self._lock: - if ( - not self.coordinator_unknown() - and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION - ): - - # this is a minimal effort attempt to leave the group. we do not - # attempt any resending if the request fails or times out. - log.info("Leaving consumer group (%s).", self.group_id) - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version]( - self.group_id, self._generation.member_id - ) - future = self._client.send(self.coordinator_id, request) - future.add_callback(self._handle_leave_group_response) - future.add_errback(log.error, "LeaveGroup request failed: %s") - self._client.poll(future=future) - - self.reset_generation() - - def _handle_leave_group_response(self, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "LeaveGroup request for group %s returned successfully", self.group_id - ) - else: - log.error( - "LeaveGroup request for group %s failed with error: %s", - self.group_id, - error_type(), - ) - - def _send_heartbeat_request(self): - """Send a heartbeat request""" - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - elif not self._client.ready(self.coordinator_id, metadata_priority=False): - e = Errors.NodeNotReadyError(self.coordinator_id) - return Future().failure(e) - - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = HeartbeatRequest[version]( - self.group_id, self._generation.generation_id, self._generation.member_id - ) - log.debug( - "Heartbeat: %s[%s] %s", - request.group, - request.generation_id, - request.member_id, - ) # pylint: disable-msg=no-member - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _handle_heartbeat_response(self, future, send_time, response): - self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "Received successful heartbeat response for group %s", self.group_id - ) - future.success(None) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - log.warning( - "Heartbeat failed for group %s: coordinator (node %s)" - " is either not started or not valid", - self.group_id, - self.coordinator(), - ) - self.coordinator_dead(error_type()) - future.failure(error_type()) - elif error_type is Errors.RebalanceInProgressError: - log.warning( - "Heartbeat failed for group %s because it is" " rebalancing", - self.group_id, - ) - self.request_rejoin() - future.failure(error_type()) - elif error_type is Errors.IllegalGenerationError: - log.warning( - "Heartbeat failed for group %s: generation id is not " " current.", - self.group_id, - ) - self.reset_generation() - future.failure(error_type()) - elif error_type is Errors.UnknownMemberIdError: - log.warning( - "Heartbeat: local member_id was not recognized;" - " this consumer needs to re-join" - ) - self.reset_generation() - future.failure(error_type) - elif error_type is Errors.GroupAuthorizationFailedError: - error = error_type(self.group_id) - log.error("Heartbeat failed: authorization error: %s", error) - future.failure(error) - else: - error = error_type() - log.error("Heartbeat failed: Unhandled error: %s", error) - future.failure(error) - - -class GroupCoordinatorMetrics(object): - def __init__(self, heartbeat, metrics, prefix, tags=None): - self.heartbeat = heartbeat - self.metrics = metrics - self.metric_group_name = prefix + "-coordinator-metrics" - - self.heartbeat_latency = metrics.sensor("heartbeat-latency") - self.heartbeat_latency.add( - metrics.metric_name( - "heartbeat-response-time-max", - self.metric_group_name, - "The max time taken to receive a response to a heartbeat request", - tags, - ), - Max(), - ) - self.heartbeat_latency.add( - metrics.metric_name( - "heartbeat-rate", - self.metric_group_name, - "The average number of heartbeats per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - self.join_latency = metrics.sensor("join-latency") - self.join_latency.add( - metrics.metric_name( - "join-time-avg", - self.metric_group_name, - "The average time taken for a group rejoin", - tags, - ), - Avg(), - ) - self.join_latency.add( - metrics.metric_name( - "join-time-max", - self.metric_group_name, - "The max time taken for a group rejoin", - tags, - ), - Max(), - ) - self.join_latency.add( - metrics.metric_name( - "join-rate", - self.metric_group_name, - "The number of group joins per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - self.sync_latency = metrics.sensor("sync-latency") - self.sync_latency.add( - metrics.metric_name( - "sync-time-avg", - self.metric_group_name, - "The average time taken for a group sync", - tags, - ), - Avg(), - ) - self.sync_latency.add( - metrics.metric_name( - "sync-time-max", - self.metric_group_name, - "The max time taken for a group sync", - tags, - ), - Max(), - ) - self.sync_latency.add( - metrics.metric_name( - "sync-rate", - self.metric_group_name, - "The number of group syncs per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - metrics.add_metric( - metrics.metric_name( - "last-heartbeat-seconds-ago", - self.metric_group_name, - "The number of seconds since the last controller heartbeat was sent", - tags, - ), - AnonMeasurable(lambda _, now: (now / 1000) - self.heartbeat.last_send), - ) - - -class HeartbeatThread(threading.Thread): - def __init__(self, coordinator): - super(HeartbeatThread, self).__init__() - self.name = coordinator.group_id + "-heartbeat" - self.coordinator = coordinator - self.enabled = False - self.closed = False - self.failed = None - - def enable(self): - with self.coordinator._lock: - self.enabled = True - self.coordinator.heartbeat.reset_timeouts() - self.coordinator._lock.notify() - - def disable(self): - self.enabled = False - - def close(self): - self.closed = True - with self.coordinator._lock: - self.coordinator._lock.notify() - if self.is_alive(): - self.join(self.coordinator.config["heartbeat_interval_ms"] / 1000) - if self.is_alive(): - log.warning("Heartbeat thread did not fully terminate during close") - - def run(self): - try: - log.debug("Heartbeat thread started") - while not self.closed: - self._run_once() - - except ReferenceError: - log.debug("Heartbeat thread closed due to coordinator gc") - - except RuntimeError as e: - log.error( - "Heartbeat thread for group %s failed due to unexpected error: %s", - self.coordinator.group_id, - e, - ) - self.failed = e - - finally: - log.debug("Heartbeat thread closed") - - def _run_once(self): - with self.coordinator._client._lock, self.coordinator._lock: - if self.enabled and self.coordinator.state is MemberState.STABLE: - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - # must get client._lock, or maybe deadlock at heartbeat - # failure callback in consumer poll - self.coordinator._client.poll(timeout_ms=0) - - with self.coordinator._lock: - if not self.enabled: - log.debug("Heartbeat disabled. Waiting") - self.coordinator._lock.wait() - log.debug("Heartbeat re-enabled.") - return - - if self.coordinator.state is not MemberState.STABLE: - # the group is not stable (perhaps because we left the - # group or because the coordinator kicked us out), so - # disable heartbeats and wait for the main thread to rejoin. - log.debug("Group state is not stable, disabling heartbeats") - self.disable() - return - - if self.coordinator.coordinator_unknown(): - future = self.coordinator.lookup_coordinator() - if not future.is_done or future.failed(): - # the immediate future check ensures that we backoff - # properly in the case that no brokers are available - # to connect to (and the future is automatically failed). - self.coordinator._lock.wait( - self.coordinator.config["retry_backoff_ms"] / 1000 - ) - - elif self.coordinator.heartbeat.session_timeout_expired(): - # the session timeout has expired without seeing a - # successful heartbeat, so we should probably make sure - # the coordinator is still healthy. - log.warning("Heartbeat session expired, marking coordinator dead") - self.coordinator.coordinator_dead("Heartbeat session expired") - - elif self.coordinator.heartbeat.poll_timeout_expired(): - # the poll timeout has expired, which means that the - # foreground thread has stalled in between calls to - # poll(), so we explicitly leave the group. - log.warning("Heartbeat poll expired, leaving group") - self.coordinator.maybe_leave_group() - - elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - log.log(0, "Not ready to heartbeat, waiting") - self.coordinator._lock.wait( - self.coordinator.config["retry_backoff_ms"] / 1000 - ) - - else: - self.coordinator.heartbeat.sent_heartbeat() - future = self.coordinator._send_heartbeat_request() - future.add_callback(self._handle_heartbeat_success) - future.add_errback(self._handle_heartbeat_failure) - - def _handle_heartbeat_success(self, result): - with self.coordinator._lock: - self.coordinator.heartbeat.received_heartbeat() - - def _handle_heartbeat_failure(self, exception): - with self.coordinator._lock: - if isinstance(exception, Errors.RebalanceInProgressError): - # it is valid to continue heartbeating while the group is - # rebalancing. This ensures that the coordinator keeps the - # member in the group for as long as the duration of the - # rebalance timeout. If we stop sending heartbeats, however, - # then the session timeout may expire before we can rejoin. - self.coordinator.heartbeat.received_heartbeat() - else: - self.coordinator.heartbeat.fail_heartbeat() - # wake up the thread if it's sleeping to reschedule the heartbeat - self.coordinator._lock.notify() diff --git a/aiokafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py deleted file mode 100644 index 1982090f..00000000 --- a/aiokafka/coordinator/consumer.py +++ /dev/null @@ -1,987 +0,0 @@ -import collections -import copy -import functools -import logging -import time -from concurrent.futures import Future - -import aiokafka.errors as Errors -from aiokafka.metrics import AnonMeasurable -from aiokafka.metrics.stats import Avg, Count, Max, Rate -from aiokafka.protocol.api import Response -from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest -from aiokafka.structs import OffsetAndMetadata, TopicPartition -from aiokafka.util import WeakMethod - -from .base import BaseCoordinator, Generation -from .assignors.range import RangePartitionAssignor -from .assignors.roundrobin import RoundRobinPartitionAssignor -from .assignors.sticky.sticky_assignor import StickyPartitionAssignor -from .protocol import ConsumerProtocol - - -log = logging.getLogger(__name__) - - -class ConsumerCoordinator(BaseCoordinator): - """This class manages the coordination process with the consumer coordinator.""" - - DEFAULT_CONFIG = { - "group_id": "kafka-python-default-group", - "enable_auto_commit": True, - "auto_commit_interval_ms": 5000, - "default_offset_commit_callback": None, - "assignors": ( - RangePartitionAssignor, - RoundRobinPartitionAssignor, - StickyPartitionAssignor, - ), - "session_timeout_ms": 10000, - "heartbeat_interval_ms": 3000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - "api_version": (0, 10, 1), - "exclude_internal_topics": True, - "metric_group_prefix": "consumer", - } - - def __init__(self, client, subscription, metrics, **configs): - """Initialize the coordination manager. - - Keyword Arguments: - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - enable_auto_commit (bool): If true the consumer's offset will be - periodically committed in the background. Default: True. - auto_commit_interval_ms (int): milliseconds between automatic - offset commits, if enable_auto_commit is True. Default: 5000. - default_offset_commit_callback (callable): called as - callback(offsets, exception) response will be either an Exception - or None. This callback can be used to trigger custom actions when - a commit request completes. - assignors (list): List of objects to use to distribute partition - ownership amongst consumer instances when group management is - used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - exclude_internal_topics (bool): Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to - True the only way to receive records from an internal topic is - subscribing to it. Requires 0.10+. Default: True - """ - super(ConsumerCoordinator, self).__init__(client, metrics, **configs) - - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - self._subscription = subscription - self._is_leader = False - self._joined_subscription = set() - self._metadata_snapshot = self._build_metadata_snapshot( - subscription, client.cluster - ) - self._assignment_snapshot = None - self._cluster = client.cluster - self.auto_commit_interval = self.config["auto_commit_interval_ms"] / 1000 - self.next_auto_commit_deadline = None - self.completed_offset_commits = collections.deque() - - if self.config["default_offset_commit_callback"] is None: - self.config[ - "default_offset_commit_callback" - ] = self._default_offset_commit_callback - - if self.config["group_id"] is not None: - if self.config["api_version"] >= (0, 9): - if not self.config["assignors"]: - raise Errors.KafkaConfigurationError( - "Coordinator requires assignors" - ) - if self.config["api_version"] < (0, 10, 1): - if ( - self.config["max_poll_interval_ms"] - != self.config["session_timeout_ms"] - ): - raise Errors.KafkaConfigurationError( - "Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms" - ) - - if self.config["enable_auto_commit"]: - if self.config["api_version"] < (0, 8, 1): - log.warning( - "Broker version (%s) does not support offset" - " commits; disabling auto-commit.", - self.config["api_version"], - ) - self.config["enable_auto_commit"] = False - elif self.config["group_id"] is None: - log.warning("group_id is None: disabling auto-commit.") - self.config["enable_auto_commit"] = False - else: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - - self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, self.config["metric_group_prefix"], self._subscription - ) - - self._cluster.request_update() - self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) - - def __del__(self): - if hasattr(self, "_cluster") and self._cluster: - self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) - super(ConsumerCoordinator, self).__del__() - - def protocol_type(self): - return ConsumerProtocol.PROTOCOL_TYPE - - def group_protocols(self): - """Returns list of preferred (protocols, metadata)""" - if self._subscription.subscription is None: - raise Errors.IllegalStateError("Consumer has not subscribed to topics") - # dpkp note: I really dislike this. - # why? because we are using this strange method group_protocols, - # which is seemingly innocuous, to set internal state (_joined_subscription) - # that is later used to check whether metadata has changed since we joined a - # group but there is no guarantee that this method, group_protocols, will get - # called in the correct sequence or that it will only be called when we want it - # to be. So this really should be moved elsewhere, but I don't have the energy - # to work that out right now. If you read this at some later date after the - # mutable state has bitten you... I'm sorry! It mimics the java client, and - # that's the best I've got for now. - self._joined_subscription = set(self._subscription.subscription) - metadata_list = [] - for assignor in self.config["assignors"]: - metadata = assignor.metadata(self._joined_subscription) - group_protocol = (assignor.name, metadata) - metadata_list.append(group_protocol) - return metadata_list - - def _handle_metadata_update(self, cluster): - # if we encounter any unauthorized topics, raise an exception - if cluster.unauthorized_topics: - raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics) - - if self._subscription.subscribed_pattern: - topics = [] - for topic in cluster.topics(self.config["exclude_internal_topics"]): - if self._subscription.subscribed_pattern.match(topic): - topics.append(topic) - - if set(topics) != self._subscription.subscription: - self._subscription.change_subscription(topics) - self._client.set_topics(self._subscription.group_subscription()) - - # check if there are any changes to the metadata which should trigger - # a rebalance - if self._subscription.partitions_auto_assigned(): - metadata_snapshot = self._build_metadata_snapshot( - self._subscription, cluster - ) - if self._metadata_snapshot != metadata_snapshot: - self._metadata_snapshot = metadata_snapshot - - # If we haven't got group coordinator support, - # just assign all partitions locally - if self._auto_assign_all_partitions(): - self._subscription.assign_from_subscribed( - [ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ] - ) - - def _auto_assign_all_partitions(self): - # For users that use "subscribe" without group support, - # we will simply assign all partitions to this consumer - if self.config["api_version"] < (0, 9): - return True - elif self.config["group_id"] is None: - return True - else: - return False - - def _build_metadata_snapshot(self, subscription, cluster): - metadata_snapshot = {} - for topic in subscription.group_subscription(): - partitions = cluster.partitions_for_topic(topic) or [] - metadata_snapshot[topic] = set(partitions) - return metadata_snapshot - - def _lookup_assignor(self, name): - for assignor in self.config["assignors"]: - if assignor.name == name: - return assignor - return None - - def _on_join_complete( - self, generation, member_id, protocol, member_assignment_bytes - ): - # only the leader is responsible for monitoring for metadata changes - # (i.e. partition changes) - if not self._is_leader: - self._assignment_snapshot = None - - assignor = self._lookup_assignor(protocol) - assert assignor, "Coordinator selected invalid assignment protocol: %s" % ( - protocol, - ) - - assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) - - # set the flag to refresh last committed offsets - self._subscription.needs_fetch_committed_offsets = True - - # update partition assignment - try: - self._subscription.assign_from_subscribed(assignment.partitions()) - except ValueError as e: - log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e) - self.request_rejoin() - - # give the assignor a chance to update internal state - # based on the received assignment - assignor.on_assignment(assignment) - if assignor.name == "sticky": - assignor.on_generation_assignment(generation) - - # reschedule the auto commit starting from now - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - - assigned = set(self._subscription.assigned_partitions()) - log.info( - "Setting newly assigned partitions %s for group %s", assigned, self.group_id - ) - - # execute the user's callback after rebalance - if self._subscription.listener: - try: - self._subscription.listener.on_partitions_assigned(assigned) - except Exception: - log.exception( - "User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, - self.group_id, - assigned, - ) - - def poll(self): - """ - Poll for coordinator events. Only applicable if group_id is set, and - broker version supports GroupCoordinators. This ensures that the - coordinator is known, and if using automatic partition assignment, - ensures that the consumer has joined the group. This also handles - periodic offset commits if they are enabled. - """ - if self.group_id is None: - return - - self._invoke_completed_offset_commit_callbacks() - self.ensure_coordinator_ready() - - if ( - self.config["api_version"] >= (0, 9) - and self._subscription.partitions_auto_assigned() - ): - if self.need_rejoin(): - # due to a race condition between the initial metadata fetch and the - # initial rebalance, we need to ensure that the metadata is fresh - # before joining initially, and then request the metadata update. If - # metadata update arrives while the rebalance is still pending (for - # example, when the join group is still inflight), then we will lose - # track of the fact that we need to rebalance again to reflect the - # change to the topic subscription. Without ensuring that the - # metadata is fresh, any metadata update that changes the topic - # subscriptions and arrives while a rebalance is in progress will - # essentially be ignored. See KAFKA-3949 for the complete - # description of the problem. - if self._subscription.subscribed_pattern: - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - - self.ensure_active_group() - - self.poll_heartbeat() - - self._maybe_auto_commit_offsets_async() - - def time_to_next_poll(self): - """Return seconds (float) remaining until :meth:`.poll` should be called - again - """ - if not self.config["enable_auto_commit"]: - return self.time_to_next_heartbeat() - - if time.time() > self.next_auto_commit_deadline: - return 0 - - return min( - self.next_auto_commit_deadline - time.time(), self.time_to_next_heartbeat() - ) - - def _perform_assignment(self, response: Response): - assignment_strategy = response.group_protocol - members = response.members - assignor = self._lookup_assignor(assignment_strategy) - assert assignor, "Invalid assignment protocol: %s" % (assignment_strategy,) - member_metadata = {} - all_subscribed_topics = set() - for member_id, metadata_bytes in members: - metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) - member_metadata[member_id] = metadata - all_subscribed_topics.update( - metadata.subscription - ) # pylint: disable-msg=no-member - - # the leader will begin watching for changes to any of the topics - # the group is interested in, which ensures that all metadata changes - # will eventually be seen - # Because assignment typically happens within response callbacks, - # we cannot block on metadata updates here (no recursion into poll()) - self._subscription.group_subscribe(all_subscribed_topics) - self._client.set_topics(self._subscription.group_subscription()) - - # keep track of the metadata used for assignment so that we can check - # after rebalance completion whether anything has changed - self._cluster.request_update() - self._is_leader = True - self._assignment_snapshot = self._metadata_snapshot - - log.debug( - "Performing assignment for group %s using strategy %s" - " with subscriptions %s", - self.group_id, - assignor.name, - member_metadata, - ) - - assignments = assignor.assign(self._cluster, member_metadata) - - log.debug("Finished assignment for group %s: %s", self.group_id, assignments) - - group_assignment = {} - for member_id, assignment in assignments.items(): - group_assignment[member_id] = assignment - return group_assignment - - def _on_join_prepare(self, generation, member_id): - # commit offsets prior to rebalance if auto-commit enabled - self._maybe_auto_commit_offsets_sync() - - # execute the user's callback before rebalance - log.info( - "Revoking previously assigned partitions %s for group %s", - self._subscription.assigned_partitions(), - self.group_id, - ) - if self._subscription.listener: - try: - revoked = set(self._subscription.assigned_partitions()) - self._subscription.listener.on_partitions_revoked(revoked) - except Exception: - log.exception( - "User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, - self.group_id, - ) - - self._is_leader = False - self._subscription.reset_group_subscription() - - def need_rejoin(self): - """Check whether the group should be rejoined - - Returns: - bool: True if consumer should rejoin group, False otherwise - """ - if not self._subscription.partitions_auto_assigned(): - return False - - if self._auto_assign_all_partitions(): - return False - - # we need to rejoin if we performed the assignment and metadata has changed - if ( - self._assignment_snapshot is not None - and self._assignment_snapshot != self._metadata_snapshot - ): - return True - - # we need to join if our subscription has changed since the last join - if ( - self._joined_subscription is not None - and self._joined_subscription != self._subscription.subscription - ): - return True - - return super(ConsumerCoordinator, self).need_rejoin() - - def refresh_committed_offsets_if_needed(self): - """Fetch committed offsets for assigned partitions.""" - if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets( - self._subscription.assigned_partitions() - ) - for partition, offset in offsets.items(): - # verify assignment is still active - if self._subscription.is_assigned(partition): - self._subscription.assignment[partition].committed = offset - self._subscription.needs_fetch_committed_offsets = False - - def fetch_committed_offsets(self, partitions): - """Fetch the current committed offsets for specified partitions - - Arguments: - partitions (list of TopicPartition): partitions to fetch - - Returns: - dict: {TopicPartition: OffsetAndMetadata} - """ - if not partitions: - return {} - - while True: - self.ensure_coordinator_ready() - - # contact coordinator to fetch committed offsets - future = self._send_offset_fetch_request(partitions) - self._client.poll(future=future) - - if future.succeeded(): - return future.value - - if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type - - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def close(self, autocommit=True): - """Close the coordinator, leave the current group, - and reset local generation / member_id. - - Keyword Arguments: - autocommit (bool): If auto-commit is configured for this consumer, - this optional flag causes the consumer to attempt to commit any - pending consumed offsets prior to close. Default: True - """ - try: - if autocommit: - self._maybe_auto_commit_offsets_sync() - finally: - super(ConsumerCoordinator, self).close() - - def _invoke_completed_offset_commit_callbacks(self): - while self.completed_offset_commits: - callback, offsets, exception = self.completed_offset_commits.popleft() - callback(offsets, exception) - - def commit_offsets_async(self, offsets, callback=None): - """Commit specific offsets asynchronously. - - Arguments: - offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit - callback (callable, optional): called as callback(offsets, response) - response will be either an Exception or a OffsetCommitResponse - struct. This callback can be used to trigger custom actions when - a commit request completes. - - Returns: - Future - """ - self._invoke_completed_offset_commit_callbacks() - if not self.coordinator_unknown(): - future = self._do_commit_offsets_async(offsets, callback) - else: - # we don't know the current coordinator, so try to find it and then - # send the commit or fail (we don't want recursive retries which can - # cause offset commits to arrive out of order). Note that there may - # be multiple offset commits chained to the same coordinator lookup - # request. This is fine because the listeners will be invoked in the - # same order that they were added. Note also that BaseCoordinator - # prevents multiple concurrent coordinator lookup requests. - future = self.lookup_coordinator() - future.add_callback( - lambda r: functools.partial( - self._do_commit_offsets_async, offsets, callback - )() - ) - if callback: - future.add_errback( - lambda e: self.completed_offset_commits.appendleft( - (callback, offsets, e) - ) - ) - - # ensure the commit has a chance to be transmitted (without blocking on - # its completion). Note that commits are treated as heartbeats by the - # coordinator, so there is no need to explicitly allow heartbeats - # through delayed task execution. - self._client.poll(timeout_ms=0) # no wakeup if we add that feature - - return future - - def _do_commit_offsets_async(self, offsets, callback=None): - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - if callback is None: - callback = self.config["default_offset_commit_callback"] - self._subscription.needs_fetch_committed_offsets = True - future = self._send_offset_commit_request(offsets) - future.add_both( - lambda res: self.completed_offset_commits.appendleft( - (callback, offsets, res) - ) - ) - return future - - def commit_offsets_sync(self, offsets): - """Commit specific offsets synchronously. - - This method will retry until the commit completes successfully or an - unrecoverable error is encountered. - - Arguments: - offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit - - Raises error on failure - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - self._invoke_completed_offset_commit_callbacks() - if not offsets: - return - - while True: - self.ensure_coordinator_ready() - - future = self._send_offset_commit_request(offsets) - self._client.poll(future=future) - - if future.succeeded(): - return future.value - - if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type - - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def _maybe_auto_commit_offsets_sync(self): - if self.config["enable_auto_commit"]: - try: - self.commit_offsets_sync(self._subscription.all_consumed_offsets()) - - # The three main group membership errors are known and should not - # require a stacktrace -- just a warning - except ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): - log.warning( - "Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery." - ) - except Exception: - log.exception( - "Offset commit failed: This is likely to cause" - " duplicate message delivery" - ) - - def _send_offset_commit_request(self, offsets): - """Commit offsets for the specified list of topics and partitions. - - This is a non-blocking call which returns a request future that can be - polled in the case of a synchronous commit or ignored in the - asynchronous case. - - Arguments: - offsets (dict of {TopicPartition: OffsetAndMetadata}): what should - be committed - - Returns: - Future: indicating whether the commit was successful or not - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - if not offsets: - log.debug("No offsets to commit") - return Future().success(None) - - node_id = self.coordinator() - if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) - - # create the offset commit request - offset_data = collections.defaultdict(dict) - for tp, offset in offsets.items(): - offset_data[tp.topic][tp.partition] = offset - - if self._subscription.partitions_auto_assigned(): - generation = self.generation() - else: - generation = Generation.NO_GENERATION - - # if the generation is None, we are not part of an active group - # (and we expect to be). The only thing we can do is fail the commit - # and let the user rejoin the group in poll() - if self.config["api_version"] >= (0, 9) and generation is None: - return Future().failure(Errors.CommitFailedError()) - - if self.config["api_version"] >= (0, 9): - request = OffsetCommitRequest[2]( - self.group_id, - generation.generation_id, - generation.member_id, - OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, - [ - ( - topic, - [ - (partition, offset.offset, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - elif self.config["api_version"] >= (0, 8, 2): - request = OffsetCommitRequest[1]( - self.group_id, - -1, - "", - [ - ( - topic, - [ - (partition, offset.offset, -1, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - elif self.config["api_version"] >= (0, 8, 1): - request = OffsetCommitRequest[0]( - self.group_id, - [ - ( - topic, - [ - (partition, offset.offset, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - - log.debug( - "Sending offset-commit request with %s for group %s to %s", - offsets, - self.group_id, - node_id, - ) - - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback( - self._handle_offset_commit_response, offsets, future, time.time() - ) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_offset_commit_response(self, offsets, future, send_time, response): - # TODO look at adding request_latency_ms to response (like java kafka) - self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) - unauthorized_topics = set() - - for topic, partitions in response.topics: - for partition, error_code in partitions: - tp = TopicPartition(topic, partition) - offset = offsets[tp] - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - log.debug( - "Group %s committed offset %s for partition %s", - self.group_id, - offset, - tp, - ) - if self._subscription.is_assigned(tp): - self._subscription.assignment[tp].committed = offset - elif error_type is Errors.GroupAuthorizationFailedError: - log.error( - "Not authorized to commit offsets for group %s", self.group_id - ) - future.failure(error_type(self.group_id)) - return - elif error_type is Errors.TopicAuthorizationFailedError: - unauthorized_topics.add(topic) - elif error_type in ( - Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError, - ): - # raise the error to the user - log.debug( - "OffsetCommit for group %s failed on partition %s" " %s", - self.group_id, - tp, - error_type.__name__, - ) - future.failure(error_type()) - return - elif error_type is Errors.GroupLoadInProgressError: - # just retry - log.debug( - "OffsetCommit for group %s failed: %s", - self.group_id, - error_type.__name__, - ) - future.failure(error_type(self.group_id)) - return - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError, - ): - log.debug( - "OffsetCommit for group %s failed: %s", - self.group_id, - error_type.__name__, - ) - self.coordinator_dead(error_type()) - future.failure(error_type(self.group_id)) - return - elif error_type in ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): - # need to re-join group - error = error_type(self.group_id) - log.debug( - "OffsetCommit for group %s failed: %s", self.group_id, error - ) - self.reset_generation() - future.failure(Errors.CommitFailedError()) - return - else: - log.error( - "Group %s failed to commit partition %s at offset" " %s: %s", - self.group_id, - tp, - offset, - error_type.__name__, - ) - future.failure(error_type()) - return - - if unauthorized_topics: - log.error( - "Not authorized to commit to topics %s for group %s", - unauthorized_topics, - self.group_id, - ) - future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) - else: - future.success(None) - - def _send_offset_fetch_request(self, partitions): - """Fetch the committed offsets for a set of partitions. - - This is a non-blocking call. The returned future can be polled to get - the actual offsets returned from the broker. - - Arguments: - partitions (list of TopicPartition): the partitions to fetch - - Returns: - Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) - if not partitions: - return Future().success({}) - - node_id = self.coordinator() - if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) - - # Verify node is ready - if not self._client.ready(node_id): - log.debug("Node %s not ready -- failing offset fetch request", node_id) - return Future().failure(Errors.NodeNotReadyError) - - log.debug( - "Group %s fetching committed offsets for partitions: %s", - self.group_id, - partitions, - ) - # construct the request - topic_partitions = collections.defaultdict(set) - for tp in partitions: - topic_partitions[tp.topic].add(tp.partition) - - if self.config["api_version"] >= (0, 8, 2): - request = OffsetFetchRequest[1]( - self.group_id, list(topic_partitions.items()) - ) - else: - request = OffsetFetchRequest[0]( - self.group_id, list(topic_partitions.items()) - ) - - # send the request with a callback - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_fetch_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_offset_fetch_response(self, future, response): - offsets = {} - for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: - tp = TopicPartition(topic, partition) - error_type = Errors.for_code(error_code) - if error_type is not Errors.NoError: - error = error_type() - log.debug( - "Group %s failed to fetch offset for partition" " %s: %s", - self.group_id, - tp, - error, - ) - if error_type is Errors.GroupLoadInProgressError: - # just retry - future.failure(error) - elif error_type is Errors.NotCoordinatorForGroupError: - # re-discover the coordinator and retry - self.coordinator_dead(error_type()) - future.failure(error) - elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning( - "OffsetFetchRequest -- unknown topic %s" - " (have you committed any offsets yet?)", - topic, - ) - continue - else: - log.error( - "Unknown error fetching offsets for %s: %s", tp, error - ) - future.failure(error) - return - elif offset >= 0: - # record the position with the offset - # (-1 indicates no committed offset to fetch) - offsets[tp] = OffsetAndMetadata(offset, metadata) - else: - log.debug( - "Group %s has no committed offset for partition" " %s", - self.group_id, - tp, - ) - future.success(offsets) - - def _default_offset_commit_callback(self, offsets, exception): - if exception is not None: - log.error("Offset commit failed: %s", exception) - - def _commit_offsets_async_on_complete(self, offsets, exception): - if exception is not None: - log.warning( - "Auto offset commit failed for group %s: %s", self.group_id, exception - ) - if getattr(exception, "retriable", False): - self.next_auto_commit_deadline = min( - time.time() + self.config["retry_backoff_ms"] / 1000, - self.next_auto_commit_deadline, - ) - else: - log.debug( - "Completed autocommit of offsets %s for group %s", - offsets, - self.group_id, - ) - - def _maybe_auto_commit_offsets_async(self): - if self.config["enable_auto_commit"]: - if self.coordinator_unknown(): - self.next_auto_commit_deadline = ( - time.time() + self.config["retry_backoff_ms"] / 1000 - ) - elif time.time() > self.next_auto_commit_deadline: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.commit_offsets_async( - self._subscription.all_consumed_offsets(), - self._commit_offsets_async_on_complete, - ) - - -class ConsumerCoordinatorMetrics(object): - def __init__(self, metrics, metric_group_prefix, subscription): - self.metrics = metrics - self.metric_group_name = "%s-coordinator-metrics" % (metric_group_prefix,) - - self.commit_latency = metrics.sensor("commit-latency") - self.commit_latency.add( - metrics.metric_name( - "commit-latency-avg", - self.metric_group_name, - "The average time taken for a commit request", - ), - Avg(), - ) - self.commit_latency.add( - metrics.metric_name( - "commit-latency-max", - self.metric_group_name, - "The max time taken for a commit request", - ), - Max(), - ) - self.commit_latency.add( - metrics.metric_name( - "commit-rate", - self.metric_group_name, - "The number of commit calls per second", - ), - Rate(sampled_stat=Count()), - ) - - num_parts = AnonMeasurable( - lambda config, now: len(subscription.assigned_partitions()) - ) - metrics.add_metric( - metrics.metric_name( - "assigned-partitions", - self.metric_group_name, - "The number of partitions currently assigned to this consumer", - ), - num_parts, - ) diff --git a/aiokafka/coordinator/heartbeat.py b/aiokafka/coordinator/heartbeat.py deleted file mode 100644 index b10a726d..00000000 --- a/aiokafka/coordinator/heartbeat.py +++ /dev/null @@ -1,69 +0,0 @@ -import copy -import time - - -class Heartbeat(object): - DEFAULT_CONFIG = { - "group_id": None, - "heartbeat_interval_ms": 3000, - "session_timeout_ms": 10000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - } - - def __init__(self, **configs): - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - if self.config["group_id"] is not None: - assert ( - self.config["heartbeat_interval_ms"] - <= self.config["session_timeout_ms"] - ), "Heartbeat interval must be lower than the session timeout" - - self.last_send = -1 * float("inf") - self.last_receive = -1 * float("inf") - self.last_poll = -1 * float("inf") - self.last_reset = time.time() - self.heartbeat_failed = None - - def poll(self): - self.last_poll = time.time() - - def sent_heartbeat(self): - self.last_send = time.time() - self.heartbeat_failed = False - - def fail_heartbeat(self): - self.heartbeat_failed = True - - def received_heartbeat(self): - self.last_receive = time.time() - - def time_to_next_heartbeat(self): - """Returns seconds (float) remaining before next heartbeat should be sent""" - time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) - if self.heartbeat_failed: - delay_to_next_heartbeat = self.config["retry_backoff_ms"] / 1000 - else: - delay_to_next_heartbeat = self.config["heartbeat_interval_ms"] / 1000 - return max(0, delay_to_next_heartbeat - time_since_last_heartbeat) - - def should_heartbeat(self): - return self.time_to_next_heartbeat() == 0 - - def session_timeout_expired(self): - last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config["session_timeout_ms"] / 1000) - - def reset_timeouts(self): - self.last_reset = time.time() - self.last_poll = time.time() - self.heartbeat_failed = False - - def poll_timeout_expired(self): - return (time.time() - self.last_poll) > ( - self.config["max_poll_interval_ms"] / 1000 - ) diff --git a/aiokafka/oauth.py b/aiokafka/oauth.py deleted file mode 100644 index cc416da1..00000000 --- a/aiokafka/oauth.py +++ /dev/null @@ -1,38 +0,0 @@ -import abc - - -class AbstractTokenProvider(abc.ABC): - """ - A Token Provider must be used for the SASL OAuthBearer protocol. - - The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - - Token Providers MUST implement the token() method - """ - - def __init__(self, **config): - pass - - @abc.abstractmethod - def token(self): - """ - Returns a (str) ID/Access Token to be sent to the Kafka - client. - """ - pass - - def extensions(self): - """ - This is an OPTIONAL method that may be implemented. - - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not implemented, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - """ - return {} diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 3c5a096e..ea9fdbe0 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -165,8 +165,7 @@ class AIOKafkaProducer: sasl_plain_password (str): password for SASL ``PLAIN`` authentication. Default: :data:`None` sasl_oauth_token_provider (:class:`~aiokafka.abc.AbstractTokenProvider`): - OAuthBearer token provider instance. (See - :mod:`aiokafka.oauth`). + OAuthBearer token provider instance. Default: :data:`None` Note: diff --git a/aiokafka/protocol/frame.py b/aiokafka/protocol/frame.py deleted file mode 100644 index 897e091b..00000000 --- a/aiokafka/protocol/frame.py +++ /dev/null @@ -1,30 +0,0 @@ -class KafkaBytes(bytearray): - def __init__(self, size): - super(KafkaBytes, self).__init__(size) - self._idx = 0 - - def read(self, nbytes=None): - if nbytes is None: - nbytes = len(self) - self._idx - start = self._idx - self._idx += nbytes - if self._idx > len(self): - self._idx = len(self) - return bytes(self[start:self._idx]) - - def write(self, data): - start = self._idx - self._idx += len(data) - self[start:self._idx] = data - - def seek(self, idx): - self._idx = idx - - def tell(self): - return self._idx - - def __str__(self): - return "KafkaBytes(%d)" % len(self) - - def __repr__(self): - return str(self) diff --git a/aiokafka/protocol/message.py b/aiokafka/protocol/message.py index a305f419..e22b6cb0 100644 --- a/aiokafka/protocol/message.py +++ b/aiokafka/protocol/message.py @@ -11,11 +11,10 @@ snappy_decode, zstd_decode, lz4_decode, - lz4_decode_old_kafka, ) +from aiokafka.errors import UnsupportedCodecError from aiokafka.util import WeakMethod -from .frame import KafkaBytes from .struct import Struct from .types import Int8, Int32, UInt32, Int64, Bytes, Schema, AbstractType @@ -156,7 +155,10 @@ def decompress(self): elif codec == self.CODEC_LZ4: assert has_lz4(), "LZ4 decompression unsupported" if self.magic == 0: - raw_bytes = lz4_decode_old_kafka(self.value) + # https://issues.apache.org/jira/browse/KAFKA-3160 + raise UnsupportedCodecError( + "LZ4 is not supported for broker version 0.8/0.9" + ) else: raw_bytes = lz4_decode(self.value) elif codec == self.CODEC_ZSTD: @@ -183,7 +185,7 @@ class MessageSet(AbstractType): @classmethod def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally - if isinstance(items, (io.BytesIO, KafkaBytes)): + if isinstance(items, io.BytesIO): size = Int32.decode(items) if prepend_size: # rewind and return all the bytes @@ -231,7 +233,7 @@ def decode(cls, data, bytes_to_read=None): @classmethod def repr(cls, messages): - if isinstance(messages, (KafkaBytes, io.BytesIO)): + if isinstance(messages, io.BytesIO): offset = messages.tell() decoded = cls.decode(messages) messages.seek(offset) diff --git a/aiokafka/protocol/parser.py b/aiokafka/protocol/parser.py deleted file mode 100644 index c19dc4f1..00000000 --- a/aiokafka/protocol/parser.py +++ /dev/null @@ -1,194 +0,0 @@ -import collections -import logging - -import aiokafka.errors as Errors -from aiokafka import __version__ - -from .commit import GroupCoordinatorResponse -from .frame import KafkaBytes -from .types import Int32 - -log = logging.getLogger(__name__) - - -class KafkaProtocol(object): - """Manage the kafka network protocol - - Use an instance of KafkaProtocol to manage bytes send/recv'd - from a network socket to a broker. - - Arguments: - client_id (str): identifier string to be included in each request - api_version (tuple): Optional tuple to specify api_version to use. - Currently only used to check for 0.8.2 protocol quirks, but - may be used for more in the future. - """ - - def __init__(self, client_id=None, api_version=None): - if client_id is None: - client_id = self._gen_client_id() - self._client_id = client_id - self._api_version = api_version - self._correlation_id = 0 - self._header = KafkaBytes(4) - self._rbuffer = None - self._receiving = False - self.in_flight_requests = collections.deque() - self.bytes_to_send = [] - - def _next_correlation_id(self): - self._correlation_id = (self._correlation_id + 1) % 2**31 - return self._correlation_id - - def _gen_client_id(self): - return "aiokafka" + __version__ - - def send_request(self, request, correlation_id=None): - """Encode and queue a kafka api request for sending. - - Arguments: - request (object): An un-encoded kafka request. - correlation_id (int, optional): Optionally specify an ID to - correlate requests with responses. If not provided, an ID will - be generated automatically. - - Returns: - correlation_id - """ - log.debug("Sending request %s", request) - if correlation_id is None: - correlation_id = self._next_correlation_id() - - header = request.build_request_header( - correlation_id=correlation_id, client_id=self._client_id - ) - message = b"".join([header.encode(), request.encode()]) - size = Int32.encode(len(message)) - data = size + message - self.bytes_to_send.append(data) - if request.expect_response(): - ifr = (correlation_id, request) - self.in_flight_requests.append(ifr) - return correlation_id - - def send_bytes(self): - """Retrieve all pending bytes to send on the network""" - data = b"".join(self.bytes_to_send) - self.bytes_to_send = [] - return data - - def receive_bytes(self, data): - """Process bytes received from the network. - - Arguments: - data (bytes): any length bytes received from a network connection - to a kafka broker. - - Returns: - responses (list of (correlation_id, response)): any/all completed - responses, decoded from bytes to python objects. - - Raises: - KafkaProtocolError: if the bytes received could not be decoded. - CorrelationIdError: if the response does not match the request - correlation id. - """ - i = 0 - n = len(data) - responses = [] - while i < n: - - # Not receiving is the state of reading the payload header - if not self._receiving: - bytes_to_read = min(4 - self._header.tell(), n - i) - self._header.write(data[i:i + bytes_to_read]) - i += bytes_to_read - - if self._header.tell() == 4: - self._header.seek(0) - nbytes = Int32.decode(self._header) - # reset buffer and switch state to receiving payload bytes - self._rbuffer = KafkaBytes(nbytes) - self._receiving = True - elif self._header.tell() > 4: - raise Errors.KafkaError( - "this should not happen - are you threading?" - ) - - if self._receiving: - total_bytes = len(self._rbuffer) - staged_bytes = self._rbuffer.tell() - bytes_to_read = min(total_bytes - staged_bytes, n - i) - self._rbuffer.write(data[i:i + bytes_to_read]) - i += bytes_to_read - - staged_bytes = self._rbuffer.tell() - if staged_bytes > total_bytes: - raise Errors.KafkaError( - "Receive buffer has more bytes than expected?" - ) - - if staged_bytes != total_bytes: - break - - self._receiving = False - self._rbuffer.seek(0) - resp = self._process_response(self._rbuffer) - responses.append(resp) - self._reset_buffer() - return responses - - def _process_response(self, read_buffer): - if not self.in_flight_requests: - raise Errors.CorrelationIdError( - "No in-flight-request found for server response" - ) - (correlation_id, request) = self.in_flight_requests.popleft() - response_header = request.parse_response_header(read_buffer) - recv_correlation_id = response_header.correlation_id - log.debug("Received correlation id: %d", recv_correlation_id) - # 0.8.2 quirk - if ( - recv_correlation_id == 0 - and correlation_id != 0 - and request.RESPONSE_TYPE is GroupCoordinatorResponse[0] - and (self._api_version == (0, 8, 2) or self._api_version is None) - ): - log.warning( - "Kafka 0.8.2 quirk -- GroupCoordinatorResponse" - " Correlation ID does not match request. This" - " should go away once at least one topic has been" - " initialized on the broker." - ) - - elif correlation_id != recv_correlation_id: - # return or raise? - raise Errors.CorrelationIdError( - "Correlation IDs do not match: sent %d, recv %d" - % (correlation_id, recv_correlation_id) - ) - - # decode response - log.debug("Processing response %s", request.RESPONSE_TYPE.__name__) - try: - response = request.RESPONSE_TYPE.decode(read_buffer) - except ValueError: - read_buffer.seek(0) - buf = read_buffer.read() - log.error( - "Response %d [ResponseType: %s Request: %s]:" - " Unable to decode %d-byte buffer: %r", - correlation_id, - request.RESPONSE_TYPE, - request, - len(buf), - buf, - ) - raise Errors.KafkaProtocolError("Unable to decode response") - - return (correlation_id, response) - - def _reset_buffer(self): - self._receiving = False - self._header.seek(0) - self._rbuffer = None diff --git a/aiokafka/protocol/pickle.py b/aiokafka/protocol/pickle.py deleted file mode 100644 index 780c4e88..00000000 --- a/aiokafka/protocol/pickle.py +++ /dev/null @@ -1,30 +0,0 @@ -import copyreg -import types - - -def _pickle_method(method): - try: - func_name = method.__func__.__name__ - obj = method.__self__ - cls = method.__self__.__class__ - except AttributeError: - func_name = method.im_func.__name__ - obj = method.im_self - cls = method.im_class - - return _unpickle_method, (func_name, obj, cls) - - -def _unpickle_method(func_name, obj, cls): - for cls in cls.mro(): - try: - func = cls.__dict__[func_name] - except KeyError: - pass - else: - break - return func.__get__(obj, cls) - - -# https://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods -copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method) diff --git a/aiokafka/record/_crecords/legacy_records.pyx b/aiokafka/record/_crecords/legacy_records.pyx index 2406ef12..0c6905fb 100644 --- a/aiokafka/record/_crecords/legacy_records.pyx +++ b/aiokafka/record/_crecords/legacy_records.pyx @@ -2,8 +2,8 @@ import aiokafka.codec as codecs from aiokafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, - gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka + gzip_encode, snappy_encode, lz4_encode, + gzip_decode, snappy_decode, lz4_decode, ) from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from zlib import crc32 as py_crc32 # needed for windows macro @@ -141,7 +141,10 @@ cdef class LegacyRecordBatch: uncompressed = snappy_decode(value) elif compression_type == _ATTR_CODEC_LZ4: if self._magic == 0: - uncompressed = lz4_decode_old_kafka(value) + # https://issues.apache.org/jira/browse/KAFKA-3160 + raise UnsupportedCodecError( + "LZ4 is not supported for broker version 0.8/0.9" + ) else: uncompressed = lz4_decode(value) @@ -437,7 +440,10 @@ cdef class LegacyRecordBatchBuilder: compressed = snappy_encode(self._buffer) elif self._compression_type == _ATTR_CODEC_LZ4: if self._magic == 0: - compressed = lz4_encode_old_kafka(bytes(self._buffer)) + # https://issues.apache.org/jira/browse/KAFKA-3160 + raise UnsupportedCodecError( + "LZ4 is not supported for broker version 0.8/0.9" + ) else: compressed = lz4_encode(bytes(self._buffer)) else: diff --git a/aiokafka/record/legacy_records.py b/aiokafka/record/legacy_records.py index c1ae9480..143576b8 100644 --- a/aiokafka/record/legacy_records.py +++ b/aiokafka/record/legacy_records.py @@ -5,8 +5,8 @@ import aiokafka.codec as codecs from aiokafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, - gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka + gzip_encode, snappy_encode, lz4_encode, + gzip_decode, snappy_decode, lz4_decode, ) from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS @@ -159,7 +159,10 @@ def _decompress(self, key_offset): uncompressed = snappy_decode(data.tobytes()) elif compression_type == self.CODEC_LZ4: if self._magic == 0: - uncompressed = lz4_decode_old_kafka(data.tobytes()) + # https://issues.apache.org/jira/browse/KAFKA-3160 + raise UnsupportedCodecError( + "LZ4 is not supported for broker version 0.8/0.9" + ) else: uncompressed = lz4_decode(data.tobytes()) return uncompressed @@ -415,7 +418,10 @@ def _maybe_compress(self): compressed = snappy_encode(buf) elif self._compression_type == self.CODEC_LZ4: if self._magic == 0: - compressed = lz4_encode_old_kafka(bytes(buf)) + # https://issues.apache.org/jira/browse/KAFKA-3160 + raise UnsupportedCodecError( + "LZ4 is not supported for broker version 0.8/0.9" + ) else: compressed = lz4_encode(bytes(buf)) compressed_size = len(compressed) diff --git a/docs/api.rst b/docs/api.rst index 9ca4f590..6455acac 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -80,9 +80,7 @@ and ``GSSAPI`` SASL methods. Be sure to install `gssapi`_ python module to use Please consult the `official documentation `__ for setup instructions on Broker side. Client configuration is pretty much the same as Java's, consult the ``sasl_*`` options in Consumer and Producer API -Reference for more details. - -.. automodule:: aiokafka.oauth +Reference for more details. See :class:`~aiokafka.abc.AbstractTokenProvider`. Error handling diff --git a/docs/index.rst b/docs/index.rst index a51a9b78..7e507d85 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -110,33 +110,7 @@ from http://landinghub.visualstudio.com/visual-cpp-build-tools Optional Snappy install +++++++++++++++++++++++ -1. Download and build Snappy from http://google.github.io/snappy/ - -Ubuntu: - -.. code:: bash - - apt-get install libsnappy-dev - -OSX: - -.. code:: bash - - brew install snappy - -From Source: - -.. code:: bash - - wget https://github.com/google/snappy/tarball/master - tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz - cd google-snappy-X.X.X-X-XXXXXXXX - ./configure - make - sudo make install - - -1. Install **aiokafka** with :code:`snappy` extra option +To enable Snappy compression/decompression, install **aiokafka** with :code:`snappy` extra option .. code:: bash diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..b283eca6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,81 @@ +[build-system] +requires = ["setuptools >=61", "wheel", "Cython >=3.0.5"] + +[project] +name = "aiokafka" +description = "Kafka integration with asyncio" +readme = "README.rst" +requires-python = ">=3.8" +license = { file = "LICENSE" } +authors = [ + { name = "Andrew Svetlov", email = "andrew.svetlov@gmail.com" }, +] +classifiers = [ + "License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Operating System :: OS Independent", + "Topic :: System :: Networking", + "Topic :: System :: Distributed Computing", + "Framework :: AsyncIO", + "Development Status :: 4 - Beta", +] + +dynamic = ["version"] + +dependencies = [ + "async-timeout", + "packaging", +] + +[optional-dependencies] +snappy = ["cramjam"] +lz4 = ["lz4 >=3.1.3"] +zstd = ["cramjam"] +gssapi = ["gssapi"] +all = ["cramjam", "lz4 >=3.1.3", "gssapi"] + +[tool.setuptools.dynamic] +version = { attr = "aiokafka.__version__" } + +[tool.setuptools] +include-package-data = false + +[tool.setuptools.packages.find] +include = [ + "aiokafka", + "aiokafka.*", +] + +[project.urls] +Documentation = "http://aiokafka.readthedocs.org" +Source = "https://github.com/aio-libs/aiokafka" +Changes = "https://github.com/aio-libs/aiokafka/blob/master/CHANGES.rst" + + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" +addopts = ["--strict-config", "--strict-markers"] +markers = [ + "ssl: Tests that require SSL certificates to run", +] +filterwarnings = [ + "error", + # FIXME Until we fix socket leaks in tests + "default:unclosed event loop:ResourceWarning", +] + +[tool.coverage.run] +branch = true +source_pkgs = ["aiokafka", "tests"] + +[tool.coverage.report] +show_missing = true +skip_covered = true +skip_empty = true diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 6cef3385..00000000 --- a/pytest.ini +++ /dev/null @@ -1,15 +0,0 @@ -[pytest] -filterwarnings = - error - # FIXME Until we fix socket leaks in tests - default:unclosed event loop:ResourceWarning - # https://github.com/docker/docker-py/issues/1293 - ignore:.*docker.sock.*:ResourceWarning - ignore:distutils .* deprecated:DeprecationWarning:docker - # From gssapi, but with improper stack - ignore:_SixMetaPathImporter.*not found:ImportWarning - # Actually comes from docker importing distutils on Windows - ignore:the imp module is deprecated in favour of importlib:DeprecationWarning:pywintypes -markers = - ssl: Tests that require SSL certificates to run -asyncio_mode = auto diff --git a/requirements-ci.txt b/requirements-ci.txt index 3d09463d..78f2d8bc 100644 --- a/requirements-ci.txt +++ b/requirements-ci.txt @@ -3,17 +3,14 @@ flake8==4.0.1 black==22.3.0 mypy==0.961 isort[colors]==5.10.0 -pytest==7.1.2 -pytest-cov==3.0.0 -pytest-asyncio==0.18.3 +pytest==7.4.3 +pytest-cov==4.1.0 +pytest-asyncio==0.21.1 pytest-mock==3.12.0 -docker==6.1.2 -chardet==4.0.0 # Until fixed requests is released +docker==6.1.3 lz4==3.1.3 -xxhash==2.0.2 -python-snappy==0.6.1 docutils==0.17.1 Pygments==2.15.0 -gssapi==1.8.2 +gssapi==1.8.3 async-timeout==4.0.1 -zstandard==0.16.0 +cramjam==2.7.0 diff --git a/requirements-cython.txt b/requirements-cython.txt index 44e1d680..e4cba70d 100644 --- a/requirements-cython.txt +++ b/requirements-cython.txt @@ -1 +1 @@ -Cython==0.29.32 +Cython==3.0.5 diff --git a/requirements-dev.txt b/requirements-dev.txt index 15d668cd..33dd4a2e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,4 +2,4 @@ -r requirements-docs.txt diff-cover==6.4.2 -setuptools>=34.4.0 \ No newline at end of file +build==1.0.3 diff --git a/requirements-win-test.txt b/requirements-win-test.txt index e2e51c61..71c2bff1 100644 --- a/requirements-win-test.txt +++ b/requirements-win-test.txt @@ -3,13 +3,10 @@ flake8==4.0.1 black==22.3.0 mypy==0.961 isort[colors]==5.10.0 -pytest==7.1.2 -pytest-cov==3.0.0 -pytest-asyncio==0.18.3 +pytest==7.4.3 +pytest-cov==4.1.0 +pytest-asyncio==0.21.1 pytest-mock==3.12.0 -docker==6.0.1 -chardet==4.0.0 # Until fixed requests is released +docker==6.1.3 lz4==3.1.3 -xxhash==2.0.2 -python-snappy==0.6.1 -zstandard==0.16.0 +cramjam==2.7.0 diff --git a/setup.py b/setup.py index 07d9710c..6c70ba1f 100644 --- a/setup.py +++ b/setup.py @@ -1,24 +1,10 @@ -import os import platform -import re +from Cython.Build import cythonize from setuptools import Extension, setup from setuptools.command.bdist_rpm import bdist_rpm as _bdist_rpm from setuptools.command.build_ext import build_ext - - -try: - from setuptools.errors import CCompilerError, ExecError, PlatformError -except ImportError: - # RTD workaround until it ships setuptools>=v59.0.0 - # See: - # - https://github.com/pypa/setuptools/pull/2858 - # - https://docs.readthedocs.io/en/stable/builds.html#python - from distutils.errors import ( - CCompilerError, - DistutilsExecError as ExecError, - DistutilsPlatformError as PlatformError, - ) +from setuptools.errors import CCompilerError, ExecError, PlatformError # Those are needed to build _hton for windows @@ -33,21 +19,11 @@ CFLAGS.extend(["-Wall", "-Wsign-compare", "-Wconversion"]) LIBRARIES.append("z") -# The extension part is copied from aiohttp's setup.py - -try: - from Cython.Build import cythonize - - USE_CYTHON = True -except ImportError: - USE_CYTHON = False - -ext = ".pyx" if USE_CYTHON else ".c" extensions = [ Extension( "aiokafka.record._crecords.legacy_records", - ["aiokafka/record/_crecords/legacy_records" + ext], + ["aiokafka/record/_crecords/legacy_records.pyx"], libraries=LIBRARIES, extra_compile_args=CFLAGS, extra_link_args=LDFLAGS, @@ -56,7 +32,7 @@ "aiokafka.record._crecords.default_records", [ "aiokafka/record/_crecords/crc32c.c", - "aiokafka/record/_crecords/default_records" + ext, + "aiokafka/record/_crecords/default_records.pyx", ], libraries=LIBRARIES, extra_compile_args=CFLAGS, @@ -64,14 +40,14 @@ ), Extension( "aiokafka.record._crecords.memory_records", - ["aiokafka/record/_crecords/memory_records" + ext], + ["aiokafka/record/_crecords/memory_records.pyx"], libraries=LIBRARIES, extra_compile_args=CFLAGS, extra_link_args=LDFLAGS, ), Extension( "aiokafka.record._crecords.cutil", - ["aiokafka/record/_crecords/crc32c.c", "aiokafka/record/_crecords/cutil" + ext], + ["aiokafka/record/_crecords/crc32c.c", "aiokafka/record/_crecords/cutil.pyx"], libraries=LIBRARIES, extra_compile_args=CFLAGS, extra_link_args=LDFLAGS, @@ -79,10 +55,6 @@ ] -if USE_CYTHON: - extensions = cythonize(extensions) - - class bdist_rpm(_bdist_rpm): def _make_spec_file(self): orig = super()._make_spec_file() @@ -110,83 +82,7 @@ def build_extension(self, ext): raise BuildFailed() -install_requires = [ - "async-timeout", - "packaging", -] - - -def read(f): - return open(os.path.join(os.path.dirname(__file__), f)).read().strip() - - -extras_require = { - "snappy": ["python-snappy>=0.5"], - "lz4": ["lz4"], # Old format (magic=0) requires xxhash - "zstd": ["zstandard"], - "gssapi": ["gssapi"], -} -extras_require["all"] = sum(extras_require.values(), []) - - -def read_version(): - regexp = re.compile(r"^__version__\W*=\W*'([\d.abrcdev]+)'") - init_py = os.path.join(os.path.dirname(__file__), "aiokafka", "__init__.py") - with open(init_py) as f: - for line in f: - match = regexp.match(line) - if match is not None: - return match.group(1) - else: - raise RuntimeError("Cannot find version in aiokafka/__init__.py") - - -classifiers = [ - "License :: OSI Approved :: Apache Software License", - "Intended Audience :: Developers", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Operating System :: OS Independent", - "Topic :: System :: Networking", - "Topic :: System :: Distributed Computing", - "Framework :: AsyncIO", - "Development Status :: 4 - Beta", -] - - -args = dict( - name="aiokafka", - version=read_version(), - description=("Kafka integration with asyncio."), - long_description="\n\n".join((read("README.rst"), read("CHANGES.rst"))), - classifiers=classifiers, - platforms=["POSIX"], - author="Andrew Svetlov", - author_email="andrew.svetlov@gmail.com", - url="http://aiokafka.readthedocs.org", - project_urls={ - "Source": "https://github.com/aio-libs/aiokafka", - }, - download_url="https://pypi.python.org/pypi/aiokafka", - license="Apache 2", - packages=["aiokafka"], - python_requires=">=3.8", - install_requires=install_requires, - extras_require=extras_require, - include_package_data=True, - ext_modules=extensions, +setup( + ext_modules=cythonize(extensions), cmdclass=dict(build_ext=ve_build_ext, bdist_rpm=bdist_rpm), ) - -try: - setup(**args) -except BuildFailed: - print("************************************************************") - print("Cannot compile C accelerator module, use pure python version") - print("************************************************************") - del args["ext_modules"] - del args["cmdclass"] - setup(**args) diff --git a/tests/record/test_default_records.py b/tests/record/test_default_records.py index a79f2aef..0037c76a 100644 --- a/tests/record/test_default_records.py +++ b/tests/record/test_default_records.py @@ -15,7 +15,7 @@ (DefaultRecordBatch.CODEC_GZIP, None), (DefaultRecordBatch.CODEC_SNAPPY, 2171068483), (DefaultRecordBatch.CODEC_LZ4, 462121143), - (DefaultRecordBatch.CODEC_ZSTD, 1679657554), + (DefaultRecordBatch.CODEC_ZSTD, 1714138923), ]) def test_read_write_serde_v2(compression_type, crc): builder = DefaultRecordBatchBuilder( diff --git a/tests/record/test_legacy.py b/tests/record/test_legacy.py index 26f3d4dc..83afff2a 100644 --- a/tests/record/test_legacy.py +++ b/tests/record/test_legacy.py @@ -44,12 +44,14 @@ def test_read_write_serde_v0_v1_no_compression(magic, key, value, checksum): assert msg.checksum == checksum[magic] & 0xffffffff -@pytest.mark.parametrize("compression_type", [ - LegacyRecordBatch.CODEC_GZIP, - LegacyRecordBatch.CODEC_SNAPPY, - LegacyRecordBatch.CODEC_LZ4 +@pytest.mark.parametrize("compression_type, magic", [ + (LegacyRecordBatch.CODEC_GZIP, 0), + (LegacyRecordBatch.CODEC_SNAPPY, 0), + # We don't support LZ4 for kafka 0.8/0.9 + (LegacyRecordBatch.CODEC_GZIP, 1), + (LegacyRecordBatch.CODEC_SNAPPY, 1), + (LegacyRecordBatch.CODEC_LZ4, 1), ]) -@pytest.mark.parametrize("magic", [0, 1]) def test_read_write_serde_v0_v1_with_compression(compression_type, magic): builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=1024 * 1024) @@ -194,7 +196,6 @@ def test_legacy_batch_size_limit(magic): @pytest.mark.parametrize("compression_type,name,checker_name", [ (LegacyRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), (LegacyRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), - (LegacyRecordBatch.CODEC_LZ4, "lz4", "has_lz4") ]) def test_unavailable_codec(compression_type, name, checker_name): builder = LegacyRecordBatchBuilder( diff --git a/tests/test_codec.py b/tests/test_codec.py index 9ae53487..f047d648 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -14,8 +14,6 @@ snappy_decode, lz4_encode, lz4_decode, - lz4_encode_old_kafka, - lz4_decode_old_kafka, zstd_encode, zstd_decode, ) @@ -102,18 +100,6 @@ def test_lz4(): assert b1 == b2 -@pytest.mark.skipif( - not has_lz4() or platform.python_implementation() == "PyPy", - reason="python-lz4 crashes on old versions of pypy", -) -def test_lz4_old(): - for i in range(1000): - b1 = random_string(100) - b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1)) - assert len(b1) == len(b2) - assert b1 == b2 - - @pytest.mark.skipif( not has_lz4() or platform.python_implementation() == "PyPy", reason="python-lz4 crashes on old versions of pypy", diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 0a70b4ba..370da35b 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -514,6 +514,7 @@ async def test_compress_decompress_gzip(self): async def test_compress_decompress_snappy(self): await self._test_compress_decompress("snappy") + @kafka_versions('>=0.10.0.0') @run_until_complete async def test_compress_decompress_lz4(self): await self._test_compress_decompress("lz4") diff --git a/tools/README b/tools/README deleted file mode 100644 index 6589511a..00000000 --- a/tools/README +++ /dev/null @@ -1 +0,0 @@ -Wheels were downloaded from https://www.lfd.uci.edu/~gohlke/pythonlibs/#python-snappy diff --git a/tools/python_snappy-0.5.4-cp36-cp36m-win32.whl b/tools/python_snappy-0.5.4-cp36-cp36m-win32.whl deleted file mode 100644 index 42cbaa28..00000000 Binary files a/tools/python_snappy-0.5.4-cp36-cp36m-win32.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp36-cp36m-win_amd64.whl b/tools/python_snappy-0.5.4-cp36-cp36m-win_amd64.whl deleted file mode 100644 index ded5f9d1..00000000 Binary files a/tools/python_snappy-0.5.4-cp36-cp36m-win_amd64.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp37-cp37m-win32.whl b/tools/python_snappy-0.5.4-cp37-cp37m-win32.whl deleted file mode 100644 index f20cc38b..00000000 Binary files a/tools/python_snappy-0.5.4-cp37-cp37m-win32.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp37-cp37m-win_amd64.whl b/tools/python_snappy-0.5.4-cp37-cp37m-win_amd64.whl deleted file mode 100644 index f69eae28..00000000 Binary files a/tools/python_snappy-0.5.4-cp37-cp37m-win_amd64.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp38-cp38-win_amd64.whl b/tools/python_snappy-0.5.4-cp38-cp38-win_amd64.whl deleted file mode 100644 index 30a48d28..00000000 Binary files a/tools/python_snappy-0.5.4-cp38-cp38-win_amd64.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp38-cp38m-win32.whl b/tools/python_snappy-0.5.4-cp38-cp38m-win32.whl deleted file mode 100644 index 3e0fea23..00000000 Binary files a/tools/python_snappy-0.5.4-cp38-cp38m-win32.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp38-cp38m-win_amd64.whl b/tools/python_snappy-0.5.4-cp38-cp38m-win_amd64.whl deleted file mode 100644 index 18e0a50e..00000000 Binary files a/tools/python_snappy-0.5.4-cp38-cp38m-win_amd64.whl and /dev/null differ diff --git a/tools/python_snappy-0.5.4-cp39-cp39-win_amd64.whl b/tools/python_snappy-0.5.4-cp39-cp39-win_amd64.whl deleted file mode 100644 index b2260a0a..00000000 Binary files a/tools/python_snappy-0.5.4-cp39-cp39-win_amd64.whl and /dev/null differ