Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add typing to aiokafka/record/* #1001

Merged
merged 10 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ FORMATTED_AREAS=\
aiokafka/structs.py \
aiokafka/util.py \
aiokafka/protocol/ \
aiokafka/record/ \
tests/test_codec.py \
tests/test_helpers.py
tests/test_helpers.py \
tests/record/

.PHONY: setup
setup:
Expand Down
7 changes: 4 additions & 3 deletions aiokafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import array
from typing import Iterable

# fmt: off
CRC_TABLE = (
Expand Down Expand Up @@ -97,7 +98,7 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(crc: int, data: Iterable[int]) -> int:
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
Expand All @@ -116,7 +117,7 @@ def crc_update(crc, data):
return crc ^ _MASK


def crc_finalize(crc):
def crc_finalize(crc: int) -> int:
"""Finalize CRC-32C checksum.
This function should be called as last step of crc calculation.
Args:
Expand All @@ -127,7 +128,7 @@ def crc_finalize(crc):
return crc & _MASK


def crc(data):
def crc(data: Iterable[int]) -> int:
"""Compute CRC-32C checksum of the data.
Args:
data: byte array, string or iterable over bytes.
Expand Down
8 changes: 8 additions & 0 deletions aiokafka/record/_crecords/cutil.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Callable

from typing_extensions import Buffer

def crc32c_cython(data: Buffer) -> int: ...
def decode_varint_cython(buffer: bytearray, pos: int) -> tuple[int, int]: ...
def encode_varint_cython(value: int, write: Callable[[int], None]) -> int: ...
def size_of_varint_cython(value: int) -> int: ...
4 changes: 2 additions & 2 deletions aiokafka/record/_crecords/default_records.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ cdef class DefaultRecord:

cdef:
readonly int64_t offset
int64_t timestamp
char timestamp_type
readonly int64_t timestamp
readonly char timestamp_type
readonly object key
readonly object value
readonly object headers
Expand Down
148 changes: 148 additions & 0 deletions aiokafka/record/_crecords/default_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import ClassVar, final

from typing_extensions import Literal, Self

from aiokafka.record._protocols import (
DefaultRecordBatchBuilderProtocol,
DefaultRecordBatchProtocol,
DefaultRecordMetadataProtocol,
DefaultRecordProtocol,
)
from aiokafka.record._types import (
CodecGzipT,
CodecLz4T,
CodecMaskT,
CodecNoneT,
CodecSnappyT,
CodecZstdT,
DefaultCompressionTypeT,
)

@final
class DefaultRecord(DefaultRecordProtocol):
def __init__(
self,
offset: int,
timestamp: int,
timestamp_type: int,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> None: ...
@property
def offset(self) -> int: ...
@property
def timestamp(self) -> int: ...
@property
def timestamp_type(self) -> int: ...
@property
def key(self) -> bytes | None: ...
@property
def value(self) -> bytes | None: ...
@property
def headers(self) -> list[tuple[str, bytes | None]]: ...
@property
def checksum(self) -> None: ...

@final
class DefaultRecordBatch(DefaultRecordBatchProtocol):
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_NONE: ClassVar[CodecNoneT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]
CODEC_ZSTD: ClassVar[CodecZstdT]

def __init__(self, buffer: bytes): ...
@property
def compression_type(self) -> int: ...
@property
def is_transactional(self) -> bool: ...
@property
def is_control_batch(self) -> bool: ...
@property
def next_offset(self) -> int: ...
def __iter__(self) -> Self: ...
def __next__(self) -> DefaultRecord: ...
def validate_crc(self) -> bool: ...
@property
def base_offset(self) -> int: ...
@property
def magic(self) -> int: ...
@property
def crc(self) -> int: ...
@property
def attributes(self) -> int: ...
@property
def last_offset_delta(self) -> int: ...
@property
def first_timestamp(self) -> int: ...
@property
def max_timestamp(self) -> int: ...
@property
def producer_id(self) -> int: ...
@property
def producer_epoch(self) -> int: ...
@property
def base_sequence(self) -> int: ...
@property
def timestamp_type(self) -> Literal[0, 1]: ...

@final
class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol):
producer_id: int
producer_epoch: int
base_sequence: int
def __init__(
self,
magic: int,
compression_type: DefaultCompressionTypeT,
is_transactional: int,
producer_id: int,
producer_epoch: int,
base_sequence: int,
batch_size: int,
) -> None: ...
def set_producer_state(
self, producer_id: int, producer_epoch: int, base_sequence: int
) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> DefaultRecordMetadata: ...
def build(self) -> bytearray: ...
def size(self) -> int: ...
def size_in_bytes(
self,
offset: int,
timestamp: int,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def size_of(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def estimate_size_in_bytes(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...

@final
class DefaultRecordMetadata(DefaultRecordMetadataProtocol):
offset: int
size: int
timestamp: int
crc: None
def __init__(self, offset: int, size: int, timestamp: int): ...
14 changes: 0 additions & 14 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,6 @@ cdef class DefaultRecord:
record.headers = headers
return record

@property
def timestamp(self):
if self.timestamp != -1:
return self.timestamp
else:
return None

@property
def timestamp_type(self):
if self.timestamp != -1:
return self.timestamp_type
else:
return None

def __repr__(self):
return (
"DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
Expand Down
95 changes: 95 additions & 0 deletions aiokafka/record/_crecords/legacy_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from typing import Any, ClassVar, Generator, final

from typing_extensions import Buffer, Literal, Never

from aiokafka.record._protocols import (
LegacyRecordBatchBuilderProtocol,
LegacyRecordBatchProtocol,
LegacyRecordMetadataProtocol,
LegacyRecordProtocol,
)
from aiokafka.record._types import (
CodecGzipT,
CodecLz4T,
CodecMaskT,
CodecSnappyT,
LegacyCompressionTypeT,
)

@final
class LegacyRecord(LegacyRecordProtocol):
def __init__(
self,
offset: int,
timestamp: int,
attributes: int,
key: bytes | None,
value: bytes | None,
crc: int,
) -> None: ...
@property
def offset(self) -> int: ...
@property
def key(self) -> bytes | None: ...
@property
def value(self) -> bytes | None: ...
@property
def headers(self) -> list[Never]: ...
ods marked this conversation as resolved.
Show resolved Hide resolved
@property
def timestamp(self) -> int | None: ...
@property
def timestamp_type(self) -> Literal[0, 1] | None: ...
@property
def checksum(self) -> int: ...

@final
class LegacyRecordBatch(LegacyRecordBatchProtocol):
RECORD_OVERHEAD_V0: ClassVar[int]
RECORD_OVERHEAD_V1: ClassVar[int]
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]

is_control_batch: bool
is_transactional: bool
producer_id: int | None
def __init__(self, buffer: Buffer, magic: int) -> None: ...
@property
def next_offset(self) -> int: ...
def validate_crc(self) -> bool: ...
def __iter__(self) -> Generator[LegacyRecord, None, None]: ...

@final
class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol):
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]

def __init__(
self, magic: int, compression_type: LegacyCompressionTypeT, batch_size: int
) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: Any = None,
) -> LegacyRecordMetadata: ...
def size(self) -> int: ...
def size_in_bytes(
self, offset: Any, timestamp: Any, key: Buffer | None, value: Buffer | None
) -> int: ...
@staticmethod
def record_overhead(magic: int) -> int: ...
def build(self) -> bytearray: ...

@final
class LegacyRecordMetadata(LegacyRecordMetadataProtocol):
offset: int
crc: int
size: int
timestamp: int
def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ...
13 changes: 13 additions & 0 deletions aiokafka/record/_crecords/memory_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import final

from aiokafka.record._protocols import MemoryRecordsProtocol

from .default_records import DefaultRecordBatch
from .legacy_records import LegacyRecordBatch

@final
class MemoryRecords(MemoryRecordsProtocol):
def __init__(self, bytes_data: bytes) -> None: ...
def size_in_bytes(self) -> int: ...
def has_next(self) -> bool: ...
def next_batch(self) -> DefaultRecordBatch | LegacyRecordBatch | None: ...
Loading
Loading