From 7fa1efa9f65a4cfaf889302a87426c8099711360 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Thu, 25 Apr 2024 17:00:32 +0500 Subject: [PATCH] test w/o protocols --- aiokafka/record/_crecords/default_records.pyi | 21 +- aiokafka/record/_crecords/legacy_records.pyi | 21 +- aiokafka/record/_crecords/memory_records.pyi | 7 +- aiokafka/record/_protocols.py | 262 ------------------ aiokafka/record/default_records.py | 56 ++-- aiokafka/record/legacy_records.py | 40 +-- aiokafka/record/memory_records.py | 26 +- tests/record/test_records.py | 4 - 8 files changed, 77 insertions(+), 360 deletions(-) delete mode 100644 aiokafka/record/_protocols.py diff --git a/aiokafka/record/_crecords/default_records.pyi b/aiokafka/record/_crecords/default_records.pyi index 1392aff0..5e5b1826 100644 --- a/aiokafka/record/_crecords/default_records.pyi +++ b/aiokafka/record/_crecords/default_records.pyi @@ -1,16 +1,8 @@ -from typing import ClassVar, final +from typing import ClassVar from typing_extensions import Literal, Self -from aiokafka.record._protocols import ( - DefaultRecordBatchBuilderProtocol, - DefaultRecordBatchProtocol, - DefaultRecordMetadataProtocol, - DefaultRecordProtocol, -) - -@final -class DefaultRecord(DefaultRecordProtocol): +class DefaultRecord: offset: int key: bytes | None value: bytes | None @@ -31,8 +23,7 @@ class DefaultRecord(DefaultRecordProtocol): @property def timestamp_type(self) -> int | None: ... -@final -class DefaultRecordBatch(DefaultRecordBatchProtocol): +class DefaultRecordBatch: CODEC_NONE: ClassVar[int] CODEC_MASK: ClassVar[int] CODEC_GZIP: ClassVar[int] @@ -67,8 +58,7 @@ class DefaultRecordBatch(DefaultRecordBatchProtocol): def __next__(self) -> DefaultRecord: ... def validate_crc(self) -> bool: ... -@final -class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol): +class DefaultRecordBatchBuilder: producer_id: int producer_epoch: int base_sequence: int @@ -118,8 +108,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol): headers: list[tuple[str, bytes | None]], ) -> int: ... -@final -class DefaultRecordMetadata(DefaultRecordMetadataProtocol): +class DefaultRecordMetadata: offset: int size: int timestamp: int diff --git a/aiokafka/record/_crecords/legacy_records.pyi b/aiokafka/record/_crecords/legacy_records.pyi index a78d071e..e58cc1d1 100644 --- a/aiokafka/record/_crecords/legacy_records.pyi +++ b/aiokafka/record/_crecords/legacy_records.pyi @@ -1,16 +1,8 @@ -from typing import Any, ClassVar, Generator, final +from typing import Any, ClassVar, Generator from typing_extensions import Buffer, Literal, Never -from aiokafka.record._protocols import ( - LegacyRecordBatchBuilderProtocol, - LegacyRecordBatchProtocol, - LegacyRecordMetadataProtocol, - LegacyRecordProtocol, -) - -@final -class LegacyRecord(LegacyRecordProtocol): +class LegacyRecord: offset: int attributes: int key: bytes | None @@ -34,8 +26,7 @@ class LegacyRecord(LegacyRecordProtocol): @property def checksum(self) -> int: ... -@final -class LegacyRecordBatch(LegacyRecordBatchProtocol): +class LegacyRecordBatch: RECORD_OVERHEAD_V0: ClassVar[int] RECORD_OVERHEAD_V1: ClassVar[int] CODEC_MASK: ClassVar[int] @@ -52,8 +43,7 @@ class LegacyRecordBatch(LegacyRecordBatchProtocol): def validate_crc(self) -> bool: ... def __iter__(self) -> Generator[LegacyRecord, None, None]: ... -@final -class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol): +class LegacyRecordBatchBuilder: CODEC_MASK: ClassVar[int] CODEC_GZIP: ClassVar[int] CODEC_SNAPPY: ClassVar[int] @@ -76,8 +66,7 @@ class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol): def record_overhead(magic: int) -> int: ... def build(self) -> bytearray: ... -@final -class LegacyRecordMetadata(LegacyRecordMetadataProtocol): +class LegacyRecordMetadata: offset: int crc: int size: int diff --git a/aiokafka/record/_crecords/memory_records.pyi b/aiokafka/record/_crecords/memory_records.pyi index d8dfa409..5f02beee 100644 --- a/aiokafka/record/_crecords/memory_records.pyi +++ b/aiokafka/record/_crecords/memory_records.pyi @@ -1,12 +1,7 @@ -from typing import final - -from aiokafka.record._protocols import MemoryRecordsProtocol - from .default_records import DefaultRecordBatch from .legacy_records import LegacyRecordBatch -@final -class MemoryRecords(MemoryRecordsProtocol): +class MemoryRecords: def __init__(self, bytes_data: bytes) -> None: ... def size_in_bytes(self) -> int: ... def has_next(self) -> bool: ... diff --git a/aiokafka/record/_protocols.py b/aiokafka/record/_protocols.py deleted file mode 100644 index 8dfc4d0e..00000000 --- a/aiokafka/record/_protocols.py +++ /dev/null @@ -1,262 +0,0 @@ -from __future__ import annotations - -from typing import ( - Any, - ClassVar, - Iterable, - Iterator, - List, - Optional, - Protocol, - Tuple, - Union, - runtime_checkable, -) - -from typing_extensions import Literal, Never - - -class DefaultRecordBatchBuilderProtocol(Protocol): - def __init__( - self, - magic: int, - compression_type: int, - is_transactional: int, - producer_id: int, - producer_epoch: int, - base_sequence: int, - batch_size: int, - ): ... - def append( - self, - offset: int, - timestamp: Optional[int], - key: Optional[bytes], - value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], - ) -> Optional[DefaultRecordMetadataProtocol]: ... - def build(self) -> bytearray: ... - def size(self) -> int: ... - def size_in_bytes( - self, - offset: int, - timestamp: int, - key: Optional[bytes], - value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], - ) -> int: ... - @classmethod - def size_of( - cls, - key: Optional[bytes], - value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], - ) -> int: ... - @classmethod - def estimate_size_in_bytes( - cls, - key: Optional[bytes], - value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], - ) -> int: ... - def set_producer_state( - self, producer_id: int, producer_epoch: int, base_sequence: int - ) -> None: ... - @property - def producer_id(self) -> int: ... - @property - def producer_epoch(self) -> int: ... - @property - def base_sequence(self) -> int: ... - - -class DefaultRecordMetadataProtocol(Protocol): - def __init__(self, offset: int, size: int, timestamp: int) -> None: ... - @property - def offset(self) -> int: ... - @property - def crc(self) -> None: ... - @property - def size(self) -> int: ... - @property - def timestamp(self) -> int: ... - - -class DefaultRecordBatchProtocol(Iterator["DefaultRecordProtocol"], Protocol): - CODEC_MASK: ClassVar[int] - CODEC_NONE: ClassVar[int] - CODEC_GZIP: ClassVar[int] - CODEC_SNAPPY: ClassVar[int] - CODEC_LZ4: ClassVar[int] - CODEC_ZSTD: ClassVar[int] - - def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: ... - @property - def base_offset(self) -> int: ... - @property - def magic(self) -> int: ... - @property - def crc(self) -> int: ... - @property - def attributes(self) -> int: ... - @property - def compression_type(self) -> int: ... - @property - def timestamp_type(self) -> int: ... - @property - def is_transactional(self) -> bool: ... - @property - def is_control_batch(self) -> bool: ... - @property - def last_offset_delta(self) -> int: ... - @property - def first_timestamp(self) -> int: ... - @property - def max_timestamp(self) -> int: ... - @property - def producer_id(self) -> int: ... - @property - def producer_epoch(self) -> int: ... - @property - def base_sequence(self) -> int: ... - @property - def next_offset(self) -> int: ... - def validate_crc(self) -> bool: ... - - -@runtime_checkable -class DefaultRecordProtocol(Protocol): - def __init__( - self, - offset: int, - timestamp: int, - timestamp_type: int, - key: Optional[bytes], - value: Optional[bytes], - headers: List[Tuple[str, Optional[bytes]]], - ) -> None: ... - @property - def offset(self) -> int: ... - @property - def timestamp(self) -> int: - """Epoch milliseconds""" - - @property - def timestamp_type(self) -> int: - """CREATE_TIME(0) or APPEND_TIME(1)""" - - @property - def key(self) -> Optional[bytes]: - """Bytes key or None""" - - @property - def value(self) -> Optional[bytes]: - """Bytes value or None""" - - @property - def headers(self) -> List[Tuple[str, Optional[bytes]]]: ... - @property - def checksum(self) -> None: ... - - -class LegacyRecordBatchBuilderProtocol(Protocol): - def __init__( - self, magic: Literal[0, 1], compression_type: int, batch_size: int - ) -> None: ... - def append( - self, - offset: int, - timestamp: Optional[int], - key: Optional[bytes], - value: Optional[bytes], - headers: Any = None, - ) -> Optional[LegacyRecordMetadataProtocol]: ... - def build(self) -> bytearray: - """Compress batch to be ready for send""" - - def size(self) -> int: - """Return current size of data written to buffer""" - - def size_in_bytes( - self, - offset: int, - timestamp: int, - key: Optional[bytes], - value: Optional[bytes], - ) -> int: - """Actual size of message to add""" - - @classmethod - def record_overhead(cls, magic: int) -> int: ... - - -class LegacyRecordMetadataProtocol(Protocol): - def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ... - @property - def offset(self) -> int: ... - @property - def crc(self) -> int: ... - @property - def size(self) -> int: ... - @property - def timestamp(self) -> int: ... - - -class LegacyRecordBatchProtocol(Iterable["LegacyRecordProtocol"], Protocol): - CODEC_MASK: ClassVar[int] - CODEC_GZIP: ClassVar[int] - CODEC_SNAPPY: ClassVar[int] - CODEC_LZ4: ClassVar[int] - - is_control_batch: bool - is_transactional: bool - producer_id: Optional[int] - - def __init__(self, buffer: Union[bytes, bytearray, memoryview], magic: int): ... - @property - def next_offset(self) -> int: ... - def validate_crc(self) -> bool: ... - - -@runtime_checkable -class LegacyRecordProtocol(Protocol): - def __init__( - self, - offset: int, - timestamp: Optional[int], - timestamp_type: Optional[Literal[0, 1]], - key: Optional[bytes], - value: Optional[bytes], - crc: int, - ) -> None: ... - @property - def offset(self) -> int: ... - @property - def timestamp(self) -> Optional[int]: - """Epoch milliseconds""" - - @property - def timestamp_type(self) -> Optional[Literal[0, 1]]: - """CREATE_TIME(0) or APPEND_TIME(1)""" - - @property - def key(self) -> Optional[bytes]: - """Bytes key or None""" - - @property - def value(self) -> Optional[bytes]: - """Bytes value or None""" - - @property - def headers(self) -> List[Never]: ... - @property - def checksum(self) -> int: ... - - -class MemoryRecordsProtocol(Protocol): - def __init__(self, bytes_data: bytes) -> None: ... - def size_in_bytes(self) -> int: ... - def has_next(self) -> bool: ... - def next_batch( - self, - ) -> Optional[Union[DefaultRecordBatchProtocol, LegacyRecordBatchProtocol]]: ... diff --git a/aiokafka/record/default_records.py b/aiokafka/record/default_records.py index a210eb2e..eb385d7d 100644 --- a/aiokafka/record/default_records.py +++ b/aiokafka/record/default_records.py @@ -53,10 +53,21 @@ # * Transactional (4) # * Timestamp Type (3) # * Compression Type (0-2) +from __future__ import annotations import struct import time -from typing import Any, Callable, List, Optional, Sized, Tuple, Type, Union, final +from typing import ( + TYPE_CHECKING, + Any, + Callable, + List, + Optional, + Sized, + Tuple, + Type, + Union, +) from typing_extensions import Self @@ -74,14 +85,20 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS -from ._protocols import ( - DefaultRecordBatchBuilderProtocol, - DefaultRecordBatchProtocol, - DefaultRecordMetadataProtocol, - DefaultRecordProtocol, -) from .util import calc_crc32c, decode_varint, encode_varint, size_of_varint +if TYPE_CHECKING: + DefaultRecordBatchBuilder: Union[ + Type[_DefaultRecordBatchBuilderPy], Type[_DefaultRecordBatchBuilderCython] + ] + DefaultRecordMetadata: Union[ + Type[_DefaultRecordMetadataPy], Type[_DefaultRecordMetadataCython] + ] + DefaultRecordBatch: Union[ + Type[_DefaultRecordBatchPy], Type[_DefaultRecordBatchCython] + ] + DefaultRecord: Union[Type[_DefaultRecordPy], Type[_DefaultRecordCython]] + class DefaultRecordBase: __slots__ = () @@ -140,8 +157,7 @@ def _assert_has_codec(self, compression_type: int) -> None: ) -@final -class _DefaultRecordBatchPy(DefaultRecordBase, DefaultRecordBatchProtocol): +class _DefaultRecordBatchPy(DefaultRecordBase): def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: self._buffer = bytearray(buffer) self._header_data: Tuple[ @@ -232,7 +248,7 @@ def _maybe_uncompress(self) -> None: def _read_msg( self, decode_varint: Callable[[bytearray, int], Tuple[int, int]] = decode_varint - ) -> "_DefaultRecordPy": + ) -> _DefaultRecordPy: # Record => # Length => Varint # Attributes => Int8 @@ -316,7 +332,7 @@ def __iter__(self) -> Self: self._maybe_uncompress() return self - def __next__(self) -> "_DefaultRecordPy": + def __next__(self) -> _DefaultRecordPy: if self._next_record_index >= self._num_records: if self._pos != len(self._buffer): raise CorruptRecordException( @@ -343,8 +359,7 @@ def validate_crc(self) -> bool: return crc == verify_crc -@final -class _DefaultRecordPy(DefaultRecordProtocol): +class _DefaultRecordPy: __slots__ = ( "_offset", "_timestamp", @@ -410,10 +425,7 @@ def __repr__(self) -> str: ) -@final -class _DefaultRecordBatchBuilderPy( - DefaultRecordBase, DefaultRecordBatchBuilderProtocol -): +class _DefaultRecordBatchBuilderPy(DefaultRecordBase): # excluding key, value and headers: # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes MAX_RECORD_OVERHEAD = 21 @@ -476,7 +488,7 @@ def append( bytearray_type: Type[bytearray] = bytearray, len_func: Callable[[Sized], int] = len, zero_len_varint: int = 1, - ) -> Optional["_DefaultRecordMetadataPy"]: + ) -> Optional[_DefaultRecordMetadataPy]: """Write message to messageset buffer with MsgVersion 2""" # Check types if get_type(offset) != type_int: @@ -701,8 +713,7 @@ def base_sequence(self) -> int: return self._base_sequence -@final -class _DefaultRecordMetadataPy(DefaultRecordMetadataProtocol): +class _DefaultRecordMetadataPy: __slots__ = ("_size", "_timestamp", "_offset") def __init__(self, offset: int, size: int, timestamp: int) -> None: @@ -733,11 +744,6 @@ def __repr__(self) -> str: ) -DefaultRecordBatchBuilder: Type[DefaultRecordBatchBuilderProtocol] -DefaultRecordMetadata: Type[DefaultRecordMetadataProtocol] -DefaultRecordBatch: Type[DefaultRecordBatchProtocol] -DefaultRecord: Type[DefaultRecordProtocol] - if NO_EXTENSIONS: DefaultRecordBatchBuilder = _DefaultRecordBatchBuilderPy DefaultRecordMetadata = _DefaultRecordMetadataPy diff --git a/aiokafka/record/legacy_records.py b/aiokafka/record/legacy_records.py index f7e1e804..3336ffac 100644 --- a/aiokafka/record/legacy_records.py +++ b/aiokafka/record/legacy_records.py @@ -3,7 +3,17 @@ import struct import time from binascii import crc32 -from typing import Any, Generator, List, Optional, Tuple, Type, Union, final +from typing import ( + TYPE_CHECKING, + Any, + Generator, + List, + Optional, + Tuple, + Type, + Union, + final, +) from typing_extensions import Literal, Never @@ -19,12 +29,15 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS -from ._protocols import ( - LegacyRecordBatchBuilderProtocol, - LegacyRecordBatchProtocol, - LegacyRecordMetadataProtocol, - LegacyRecordProtocol, -) +if TYPE_CHECKING: + LegacyRecordBatchBuilder: Union[ + Type[_LegacyRecordBatchBuilderPy], Type[_LegacyRecordBatchBuilderCython] + ] + LegacyRecordMetadata: Union[ + Type[_LegacyRecordMetadataPy], Type[_LegacyRecordMetadataCython] + ] + LegacyRecordBatch: Union[Type[_LegacyRecordBatchPy], Type[_LegacyRecordBatchCython]] + LegacyRecord: Union[Type[_LegacyRecordPy], Type[_LegacyRecordCython]] NoneType = type(None) @@ -105,7 +118,7 @@ def _assert_has_codec(self, compression_type: int) -> None: @final -class _LegacyRecordBatchPy(LegacyRecordBase, LegacyRecordBatchProtocol): +class _LegacyRecordBatchPy(LegacyRecordBase): is_control_batch: bool = False is_transactional: bool = False producer_id: Optional[int] = None @@ -277,7 +290,7 @@ def __iter__(self) -> Generator[_LegacyRecordPy, None, None]: @final -class _LegacyRecordPy(LegacyRecordProtocol): +class _LegacyRecordPy: __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", "_crc") def __init__( @@ -337,7 +350,7 @@ def __repr__(self) -> str: @final -class _LegacyRecordBatchBuilderPy(LegacyRecordBase, LegacyRecordBatchBuilderProtocol): +class _LegacyRecordBatchBuilderPy(LegacyRecordBase): _buffer: Optional[bytearray] = None def __init__( @@ -551,7 +564,7 @@ def record_overhead(cls, magic: int) -> int: @final -class _LegacyRecordMetadataPy(LegacyRecordMetadataProtocol): +class _LegacyRecordMetadataPy: __slots__ = ("_crc", "_size", "_timestamp", "_offset") def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: @@ -584,11 +597,6 @@ def __repr__(self) -> str: ) -LegacyRecordBatchBuilder: Type[LegacyRecordBatchBuilderProtocol] -LegacyRecordMetadata: Type[LegacyRecordMetadataProtocol] -LegacyRecordBatch: Type[LegacyRecordBatchProtocol] -LegacyRecord: Type[LegacyRecordProtocol] - if NO_EXTENSIONS: LegacyRecordBatchBuilder = _LegacyRecordBatchBuilderPy LegacyRecordMetadata = _LegacyRecordMetadataPy diff --git a/aiokafka/record/memory_records.py b/aiokafka/record/memory_records.py index b618d4a8..5782728f 100644 --- a/aiokafka/record/memory_records.py +++ b/aiokafka/record/memory_records.py @@ -18,24 +18,22 @@ # # So we can iterate over batches just by knowing offsets of Length. Magic is # used to construct the correct class for Batch itself. +from __future__ import annotations import struct -from typing import Optional, Type, Union, final +from typing import TYPE_CHECKING, Optional, Type, Union from aiokafka.errors import CorruptRecordException from aiokafka.util import NO_EXTENSIONS -from ._protocols import ( - DefaultRecordBatchProtocol, - LegacyRecordBatchProtocol, - MemoryRecordsProtocol, -) -from .default_records import DefaultRecordBatch -from .legacy_records import LegacyRecordBatch, _LegacyRecordBatchPy +from .default_records import _DefaultRecordBatchPy +from .legacy_records import _LegacyRecordBatchPy +if TYPE_CHECKING: + MemoryRecords: Union[Type[_MemoryRecordsPy], Type[_MemoryRecordsCython]] -@final -class _MemoryRecordsPy(MemoryRecordsProtocol): + +class _MemoryRecordsPy: LENGTH_OFFSET = struct.calcsize(">q") LOG_OVERHEAD = struct.calcsize(">qi") MAGIC_OFFSET = struct.calcsize(">qii") @@ -87,7 +85,7 @@ def has_next(self) -> bool: # NOTE: same cache for LOAD_FAST as above def next_batch( self, _min_slice: int = MIN_SLICE, _magic_offset: int = MAGIC_OFFSET - ) -> Optional[Union[DefaultRecordBatchProtocol, LegacyRecordBatchProtocol]]: + ) -> Optional[Union[_DefaultRecordBatchPy, _LegacyRecordBatchPy]]: next_slice = self._next_slice if next_slice is None: return None @@ -99,12 +97,10 @@ def next_batch( self._cache_next() magic = next_slice[_magic_offset] if magic >= 2: # pragma: no cover - return DefaultRecordBatch(next_slice) + return _DefaultRecordBatchPy(next_slice) else: - return LegacyRecordBatch(next_slice, magic) - + return _LegacyRecordBatchPy(next_slice, magic) -MemoryRecords: Type[MemoryRecordsProtocol] if NO_EXTENSIONS: MemoryRecords = _MemoryRecordsPy diff --git a/tests/record/test_records.py b/tests/record/test_records.py index 3323d159..b7c23f8b 100644 --- a/tests/record/test_records.py +++ b/tests/record/test_records.py @@ -2,7 +2,6 @@ from aiokafka.errors import CorruptRecordException from aiokafka.record import MemoryRecords -from aiokafka.record._protocols import DefaultRecordProtocol, LegacyRecordProtocol # This is real live data from Kafka 11 broker record_batch_data_v2 = [ @@ -68,7 +67,6 @@ def test_memory_records_v2() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 - assert isinstance(recs[0], DefaultRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp == 1503229838908 @@ -95,7 +93,6 @@ def test_memory_records_v1() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 - assert isinstance(recs[0], LegacyRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp == 1503648000942 @@ -124,7 +121,6 @@ def test_memory_records_v0() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 - assert isinstance(recs[0], LegacyRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp is None