Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Nov 15, 2024
1 parent ac29007 commit aa786ce
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
21 changes: 15 additions & 6 deletions quixstreams/sources/base/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from typing import List

from quixstreams.logging import LOGGER_NAME, configure_logging
from quixstreams.models import Topic
from quixstreams.models import Topic, TopicAdmin
from quixstreams.models.topics import TopicConfig
from quixstreams.rowconsumer import RowConsumer
from quixstreams.rowproducer import RowProducer
from quixstreams.state import RecoveryManager, StateStoreManager, StorePartition
from quixstreams.state.memory import MemoryStore

Expand All @@ -27,10 +29,17 @@ class SourceProcess(multiprocessing.Process):
Some methods are designed to be used from the parent process, and others from the child process.
"""

def __init__(self, source, topic, producer, consumer, topic_manager):
def __init__(
self,
source: BaseSource,
topic: Topic,
producer: RowProducer,
consumer: RowConsumer,
topic_manager: TopicAdmin,
):
super().__init__()
self.topic = topic
self.source: BaseSource = source
self.source = source

self._exceptions: List[Exception] = []
self._started = False
Expand All @@ -48,7 +57,7 @@ def __init__(self, source, topic, producer, consumer, topic_manager):
self._rpipe, self._wpipe = multiprocessing.Pipe(duplex=False)

@property
def started(self):
def started(self) -> bool:
return self._started

# --- CHILD PROCESS METHODS --- #
Expand Down Expand Up @@ -124,7 +133,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition:
store_type=MemoryStore,
topic_config=TopicConfig(
num_partitions=source.store_partitions_count,
replication_factor=1,
replication_factor=self._topic_manager.default_replication_factor,
),
)

Expand Down Expand Up @@ -240,7 +249,7 @@ def register(
producer,
consumer,
topic_manager,
):
) -> SourceProcess:
"""
Register a new source in the manager.
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/sources/base/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def assigned_store_partition(self) -> int:
return 0

@property
def store_name(self):
def store_name(self) -> str:
"""
The source store name
"""
Expand Down

0 comments on commit aa786ce

Please sign in to comment.