Skip to content

Commit

Permalink
Docs, expose & document batch_size on subscription & projector.
Browse files Browse the repository at this point in the history
  • Loading branch information
moser committed Oct 25, 2023
1 parent 630efc1 commit a76da8c
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 21 deletions.
24 changes: 19 additions & 5 deletions depeche_db/_aggregated_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ def __init__(
store: MessageStore[E],
partitioner: MessagePartitioner[E],
stream_wildcards: List[str],
update_batch_size: Optional[int] = None,
) -> None:
"""
AggregatedStream is a stream that is used to aggregate multiple streams into one.
AggregatedStream aggregates multiple streams into one (partitioned) stream.
Read more about aggregated streams under [Data Model](../concepts/data-model.md#aggregated-streams).
The `update_batch_size` argument can be used to control the batch size of the
update process. Higher numbers will result in less database roundtrips but
also in higher memory usage.
Args:
name: Stream name
name: Stream name, needs to be a valid python identifier
store: Message store
partitioner: Message partitioner
stream_wildcards: List of stream wildcards
update_batch_size: Batch size for updating the stream, defaults to 100
Attributes:
name (str): Stream name
projector (StreamProjector): Stream projector
subscription (SubscriptionFactory): Factory to create subscriptions
subscription (SubscriptionFactory): Factory to create subscriptions on this stream
"""
assert name.isidentifier(), "name must be a valid identifier"
self.name = name
Expand Down Expand Up @@ -107,6 +115,7 @@ def __init__(
stream=self,
partitioner=partitioner,
stream_wildcards=stream_wildcards,
batch_size=update_batch_size,
)

def truncate(self, conn: SAConnection):
Expand Down Expand Up @@ -198,7 +207,8 @@ def get_partition_statistics(
result_limit: Optional[int] = None,
) -> Iterator[StreamPartitionStatistic]:
"""
Get partition statistics for deciding which partitions to read from.
Get partition statistics for deciding which partitions to read from. This
is used by subscriptions.
"""
with self._connection() as conn:
position_limits = position_limits or {-1: -1}
Expand Down Expand Up @@ -259,16 +269,20 @@ def __init__(
stream: AggregatedStream[E],
partitioner: MessagePartitioner[E],
stream_wildcards: List[str],
batch_size: Optional[int] = None,
):
"""
Stream projector is responsible for updating an aggregated stream.
The update process is locked to prevent concurrent updates. Thus, it is
fine to run the projector in multiple processes.
Implements: [RunOnNotification][depeche_db.RunOnNotification]
"""
self.stream = stream
self.stream_wildcards = stream_wildcards
self.partitioner = partitioner
self.batch_size = 100
self.batch_size = batch_size or 100

@property
def notification_channel(self) -> str:
Expand Down
45 changes: 36 additions & 9 deletions depeche_db/_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,33 @@

class AggregatedStreamFactory(Generic[E]):
def __init__(self, store: "MessageStore[E]"):
"""
This factory is accessible on the message store:
store = MessageStore(...)
stream = store.aggregated_stream(
name="stream_name",
partitioner=...,
stream_wildcards=["stream_%"]
)
"""
self._store = store

def __call__(
self,
name: str,
partitioner: "MessagePartitioner[E]",
stream_wildcards: List[str],
update_batch_size: Optional[int] = None,
) -> "AggregatedStream[E]":
"""
Create an aggregated stream.
Args:
name: The name of the stream
name: The name of the stream, needs to be a valid python identifier
partitioner: A partitioner for the stream
stream_wildcards: A list of stream wildcards to be aggregated
stream_wildcards: A list of stream wildcards
update_batch_size: The batch size for updating the stream
"""
from ._aggregated_stream import AggregatedStream

Expand All @@ -54,12 +66,26 @@ def __call__(

class SubscriptionFactory(Generic[E]):
def __init__(self, stream: "AggregatedStream[E]"):
"""
This factory is accessible on the aggregated stream:
stream : AggregatedStream = ...
subscription = stream.subscription(
name="subscription_name",
handlers=...,
call_middleware=...,
error_handler=...,
state_provider=...,
lock_provider=...,
)
"""
self._stream = stream

def __call__(
self,
name: str,
handlers: MessageHandlerRegisterProtocol[E] = None,
handlers: Optional[MessageHandlerRegisterProtocol[E]] = None,
batch_size: Optional[int] = None,
call_middleware: Optional[CallMiddleware] = None,
error_handler: Optional[SubscriptionErrorHandler] = None,
state_provider: Optional[SubscriptionStateProvider] = None,
Expand All @@ -69,12 +95,13 @@ def __call__(
Create a subscription.
Args:
name: The name of the subscription
handlers: Handlers to be called when a message is received
call_middleware: A middleware to be called before the handlers
error_handler: A handler for errors raised by the handlers
state_provider: A provider for the subscription state
lock_provider: A provider for the subscription locks
name: The name of the subscription, needs to be a valid python identifier
handlers: Handlers to be called when a message is received, defaults to an empty register
batch_size: Number of messages to read at once, defaults to 10, read more [here][depeche_db.SubscriptionRunner]
call_middleware: A middleware to customize the call to the handlers
error_handler: A handler for errors raised by the handlers, defaults to handler that will exit the subscription
state_provider: Provider for the subscription state, defaults to a PostgreSQL provider
lock_provider: Provider for the locks, defaults to a PostgreSQL provider
"""
from ._message_handler import MessageHandlerRegister
from ._subscription import Subscription, SubscriptionMessageHandler
Expand Down
3 changes: 3 additions & 0 deletions depeche_db/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def call(
"""
Calls a handler with a given message.
The type of the message depends on the type annotation of the handler function.
See [MessageHandlerRegister][depeche_db.MessageHandlerRegister] for more details.
Args:
handler: Handler
message: Message to be passed to the handler
Expand Down
2 changes: 1 addition & 1 deletion depeche_db/_message_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def synchronize(
Optimistic concurrency control must used to ensure that the stream is
not modified by another process between reading the last message and
writing the new message. You have to give `expected_version`. If the
stream has been modified, a `ValueError` will be raised.
stream has been modified, a `OptimisticConcurrencyError` will be raised.
You can give a connection to use for the write as `conn`. If you don't
give a connection, a new connection will be created and the write will
Expand Down
37 changes: 31 additions & 6 deletions depeche_db/_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,21 @@


class ExitSubscriptionErrorHandler(SubscriptionErrorHandler):
"""
Exit the subscription on error
"""

def handle_error(
self, error: Exception, message: SubscriptionMessage[E]
) -> ErrorAction:
return ErrorAction.EXIT


class LogAndIgnoreSubscriptionErrorHandler(SubscriptionErrorHandler):
"""
Log the error and ignore the message
"""

def __init__(self, subscription_name: str):
self._logger = _logging.getLogger(
f"depeche_db.subscription.{subscription_name}"
Expand All @@ -55,6 +63,7 @@ def __init__(
name: str,
stream: AggregatedStream[E],
message_handler: "SubscriptionMessageHandler[E]",
batch_size: Optional[int] = None,
state_provider: Optional[SubscriptionStateProvider] = None,
lock_provider: Optional[LockProvider] = None,
# TODO start at time
Expand All @@ -63,12 +72,15 @@ def __init__(
"""
A subscription is a way to read messages from an aggregated stream.
Read more about the subscription in the [concepts section](../concepts/subscriptions.md).
Args:
name: Name of the subscription
message_handler: Handler for the messages
name: Name of the subscription, needs to be a valid python identifier
stream: Stream to read from
state_provider: Provider for the subscription state
lock_provider: Provider for the locks
message_handler: Handler for the messages
batch_size: Number of messages to read at once, defaults to 10, read more [here][depeche_db.SubscriptionRunner]
state_provider: Provider for the subscription state, defaults to a PostgreSQL provider
lock_provider: Provider for the locks, defaults to a PostgreSQL provider
"""
assert name.isidentifier(), "Group name must be a valid identifier"
self.name = name
Expand All @@ -82,6 +94,7 @@ def __init__(
self.runner = SubscriptionRunner(
subscription=self,
message_handler=message_handler,
batch_size=batch_size,
)

def get_next_messages(self, count: int) -> Iterator[SubscriptionMessage[E]]:
Expand Down Expand Up @@ -155,7 +168,7 @@ def __init__(
Args:
handler_register: The handler register to use
error_handler: The error handler to use
error_handler: A handler for errors raised by the handlers, defaults to handler that will exit the subscription
call_middleware: The middleware to call before calling the handler
"""
self._register = handler_register
Expand Down Expand Up @@ -214,18 +227,30 @@ def __init__(
self,
subscription: Subscription[E],
message_handler: SubscriptionMessageHandler,
batch_size: Optional[int] = None,
):
"""
Handles messages from a subscription using a handler
The `batch_size` argument controls how many messages to handle in each
batch. If not provided, the default is 10. A larger batch size will
result less round trips to the database, but will also make it more
likely that messages from _different partitions_ will be processed out of
the order defined by their `global_position` on the message store.
A batch size of 1 will ensure that messages are processed in order
regarding to their `global_position`.
Messages in the same partition will always be processed in order.
Implements: [RunOnNotification][depeche_db.RunOnNotification]
Args:
subscription: The subscription to handle
message_handler: The handler to use
batch_size: The number of messages to handle in each batch, defaults to 10
"""
self._subscription = subscription
self._batch_size = 100
self._batch_size = batch_size or 10
self._keep_running = True
self._handler = message_handler

Expand Down
3 changes: 3 additions & 0 deletions docs/api/aggregated_stream.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
::: depeche_db.AggregatedStreamFactory
---
::: depeche_db.AggregatedStream
---
::: depeche_db.StreamProjector
---
::: depeche_db.MessagePartitioner
options:
show_signature_annotations: true
4 changes: 4 additions & 0 deletions docs/api/datastructures.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
::: depeche_db.MessageProtocol
---
::: depeche_db.MessagePosition
---
::: depeche_db.StoredMessage
---
::: depeche_db.SubscriptionMessage
---
::: depeche_db.SubscriptionState
1 change: 1 addition & 0 deletions docs/api/exceptions.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
::: depeche_db.MessageNotFound
---
::: depeche_db.OptimisticConcurrencyError
1 change: 1 addition & 0 deletions docs/api/executor.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
::: depeche_db.Executor
---
::: depeche_db.RunOnNotification
options:
show_signature_annotations: true
2 changes: 2 additions & 0 deletions docs/api/message_handlers.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
::: depeche_db.MessageHandlerRegisterProtocol
---
::: depeche_db.MessageHandlerRegister
---
::: depeche_db.MessageHandler
2 changes: 2 additions & 0 deletions docs/api/message_store.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
::: depeche_db.MessageStore
---
::: depeche_db.MessageStoreReader
---
::: depeche_db.MessageSerializer
options:
show_signature_annotations: true
12 changes: 12 additions & 0 deletions docs/api/subscription.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
::: depeche_db.SubscriptionFactory
---
::: depeche_db.Subscription
---
::: depeche_db.CallMiddleware
options:
show_signature_annotations: false
---
::: depeche_db.SubscriptionErrorHandler
options:
show_signature_annotations: true
---
::: depeche_db.ErrorAction
---
::: depeche_db.ExitSubscriptionErrorHandler
---
::: depeche_db.LogAndIgnoreSubscriptionErrorHandler
---
::: depeche_db.SubscriptionMessageHandler
---
::: depeche_db.SubscriptionRunner
---
::: depeche_db.LockProvider
options:
show_signature_annotations: true
---
::: depeche_db.SubscriptionStateProvider
options:
show_signature_annotations: true

0 comments on commit a76da8c

Please sign in to comment.