diff --git a/quixstreams/sources/base/manager.py b/quixstreams/sources/base/manager.py index 649478ba2..4d4b8e65c 100644 --- a/quixstreams/sources/base/manager.py +++ b/quixstreams/sources/base/manager.py @@ -5,8 +5,10 @@ from typing import List from quixstreams.logging import LOGGER_NAME, configure_logging -from quixstreams.models import Topic +from quixstreams.models import Topic, TopicAdmin from quixstreams.models.topics import TopicConfig +from quixstreams.rowconsumer import RowConsumer +from quixstreams.rowproducer import RowProducer from quixstreams.state import RecoveryManager, StateStoreManager, StorePartition from quixstreams.state.memory import MemoryStore @@ -27,10 +29,17 @@ class SourceProcess(multiprocessing.Process): Some methods are designed to be used from the parent process, and others from the child process. """ - def __init__(self, source, topic, producer, consumer, topic_manager): + def __init__( + self, + source: BaseSource, + topic: Topic, + producer: RowProducer, + consumer: RowConsumer, + topic_manager: TopicAdmin, + ): super().__init__() self.topic = topic - self.source: BaseSource = source + self.source = source self._exceptions: List[Exception] = [] self._started = False @@ -48,7 +57,7 @@ def __init__(self, source, topic, producer, consumer, topic_manager): self._rpipe, self._wpipe = multiprocessing.Pipe(duplex=False) @property - def started(self): + def started(self) -> bool: return self._started # --- CHILD PROCESS METHODS --- # @@ -124,7 +133,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition: store_type=MemoryStore, topic_config=TopicConfig( num_partitions=source.store_partitions_count, - replication_factor=1, + replication_factor=self._topic_manager.default_replication_factor, ), ) @@ -240,7 +249,7 @@ def register( producer, consumer, topic_manager, - ): + ) -> SourceProcess: """ Register a new source in the manager. diff --git a/quixstreams/sources/base/source.py b/quixstreams/sources/base/source.py index a56a61c71..383b3d331 100644 --- a/quixstreams/sources/base/source.py +++ b/quixstreams/sources/base/source.py @@ -411,7 +411,7 @@ def assigned_store_partition(self) -> int: return 0 @property - def store_name(self): + def store_name(self) -> str: """ The source store name """