Skip to content

Commit

Permalink
add typing to aiokafka/record/*
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Apr 21, 2024
1 parent 1862620 commit 094cb23
Show file tree
Hide file tree
Showing 12 changed files with 723 additions and 156 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ FORMATTED_AREAS=\
aiokafka/structs.py \
aiokafka/util.py \
aiokafka/protocol/ \
aiokafka/record/ \
tests/test_codec.py \
tests/test_helpers.py

Expand Down
9 changes: 6 additions & 3 deletions aiokafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import array
from typing import Iterable, Union

# fmt: off
CRC_TABLE = (
Expand Down Expand Up @@ -97,7 +98,9 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(
crc: int, data: Union["array.array[int]", bytes, bytearray, Iterable[int]]
) -> int:
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
Expand All @@ -116,7 +119,7 @@ def crc_update(crc, data):
return crc ^ _MASK


def crc_finalize(crc):
def crc_finalize(crc: int) -> int:
"""Finalize CRC-32C checksum.
This function should be called as last step of crc calculation.
Args:
Expand All @@ -127,7 +130,7 @@ def crc_finalize(crc):
return crc & _MASK


def crc(data):
def crc(data: Union["array.array[int]", bytes, bytearray, Iterable[int]]) -> int:
"""Compute CRC-32C checksum of the data.
Args:
data: byte array, string or iterable over bytes.
Expand Down
8 changes: 8 additions & 0 deletions aiokafka/record/_crecords/cutil.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Callable

from typing_extensions import Buffer

def crc32c_cython(data: Buffer) -> int: ...
def decode_varint_cython(buffer: bytearray, pos: int) -> tuple[int, int]: ...
def encode_varint_cython(value: int, write: Callable[[int], None]) -> int: ...
def size_of_varint_cython(value: int) -> int: ...
105 changes: 105 additions & 0 deletions aiokafka/record/_crecords/default_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from typing import Sequence, final

from typing_extensions import Self

from aiokafka.record._protocols import (
DefaultRecordBatchBuilderProtocol,
DefaultRecordBatchProtocol,
DefaultRecordMetadataProtocol,
DefaultRecordProtocol,
)

@final
class DefaultRecord(DefaultRecordProtocol):
offset: int
key: bytes | None
value: bytes | None
headers: Sequence[tuple[str, bytes | None]]
checksum: None

def __init__(
self,
offset: int,
timestamp: int,
timestamp_type: int,
key: bytes | None,
value: bytes | None,
headers: Sequence[tuple[str, bytes | None]],
) -> None: ...
@property
def timestamp(self) -> int | None: ...
@property
def timestamp_type(self) -> int | None: ...

@final
class DefaultRecordBatch(DefaultRecordBatchProtocol):
def __init__(self, buffer: bytes): ...
@property
def compression_type(self) -> int: ...
@property
def is_transactional(self) -> bool: ...
@property
def is_control_batch(self) -> bool: ...
@property
def next_offset(self) -> int: ...
def __iter__(self) -> Self: ...
def __next__(self) -> DefaultRecord: ...
def validate_crc(self) -> bool: ...

@final
class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol):
producer_id: int
producer_epoch: int
base_sequence: int
def __init__(
self,
magic: int,
compression_type: int,
is_transactional: int,
producer_id: int,
producer_epoch: int,
base_sequence: int,
batch_size: int,
) -> None: ...
def set_producer_state(
self, producer_id: int, producer_epoch: int, base_sequence: int
) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> DefaultRecordMetadata: ...
def build(self) -> bytearray: ...
def size(self) -> int: ...
def size_in_bytes(
self,
offset: int,
timestamp: int,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def size_of(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def estimate_size_in_bytes(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...

@final
class DefaultRecordMetadata(DefaultRecordMetadataProtocol):
offset: int
size: int
timestamp: int
def __init__(self, offset: int, size: int, timestamp: int): ...
70 changes: 70 additions & 0 deletions aiokafka/record/_crecords/legacy_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Any, Generator, final

from typing_extensions import Literal, Never

from aiokafka.record._protocols import (
LegacyRecordBatchBuilderProtocol,
LegacyRecordBatchProtocol,
LegacyRecordMetadataProtocol,
LegacyRecordProtocol,
)

@final
class LegacyRecord(LegacyRecordProtocol):
offset: int
attributes: int
key: bytes | None
value: bytes | None
crc: int
def __init__(
self,
offset: int,
timestamp: int,
attributes: int,
key: bytes | None,
value: bytes | None,
crc: int,
) -> None: ...
@property
def headers(self) -> list[Never]: ...
@property
def timestamp(self) -> int | None: ...
@property
def timestamp_type(self) -> Literal[0, 1] | None: ...
@property
def checksum(self) -> int: ...

@final
class LegacyRecordBatch(LegacyRecordBatchProtocol):
is_control_batch: bool
is_transactional: bool
producer_id: int | None
def __init__(self, buffer, magic: int) -> None: ...
@property
def next_offset(self) -> int: ...
def validate_crc(self) -> bool: ...
def __iter__(self) -> Generator[LegacyRecord, None, None]: ...

@final
class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol):
def __init__(self, magic: int, compression_type: int, batch_size: int) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: Any = None,
) -> LegacyRecordMetadata: ...
def size(self) -> int: ...
@staticmethod
def record_overhead(magic: int) -> int: ...
def build(self) -> bytearray: ...

@final
class LegacyRecordMetadata(LegacyRecordMetadataProtocol):
offset: int
crc: int
size: int
timestamp: int
def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ...
13 changes: 13 additions & 0 deletions aiokafka/record/_crecords/memory_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import final

from aiokafka.record._protocols import MemoryRecordsProtocol

from .default_records import DefaultRecordBatch
from .legacy_records import LegacyRecordBatch

@final
class MemoryRecords(MemoryRecordsProtocol):
def __init__(self, bytes_data: bytes) -> None: ...
def size_in_bytes(self) -> int: ...
def has_next(self) -> bool: ...
def next_batch(self) -> DefaultRecordBatch | LegacyRecordBatch | None: ...
Loading

0 comments on commit 094cb23

Please sign in to comment.