Skip to content

Commit

Permalink
Aggregated stream read returns datastructure.
Browse files Browse the repository at this point in the history
  • Loading branch information
moser committed Oct 10, 2023
1 parent 3ae7103 commit b82cb39
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 16 deletions.
12 changes: 8 additions & 4 deletions depeche_db/_aggregated_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlalchemy_utils import UUIDType as _UUIDType

from ._interfaces import (
AggregatedStreamMessage,
MessagePartitioner,
MessageProtocol,
StreamPartitionStatistic,
Expand Down Expand Up @@ -122,14 +123,17 @@ def add(
)
)

def read(self, conn: _sa.Connection, partition: int) -> Iterator[_uuid.UUID]:
def read(
self, conn: _sa.Connection, partition: int
) -> Iterator[AggregatedStreamMessage]:
for row in conn.execute(
_sa.select(self._table.c.message_id)
_sa.select(self._table.c.message_id, self._table.c.position)
.where(self._table.c.partition == partition)
.order_by(self._table.c.position)
):
# TODO result object that has message_id, position and possibly partition?!
yield row.message_id
yield AggregatedStreamMessage(
message_id=row.message_id, position=row.position, partition=partition
)

@_contextlib.contextmanager
def _connection(self):
Expand Down
7 changes: 7 additions & 0 deletions depeche_db/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class StreamPartitionStatistic:
next_message_occurred_at: _dt.datetime


@_dc.dataclass
class AggregatedStreamMessage:
partition: int
position: int
message_id: _uuid.UUID


@_dc.dataclass
class SubscriptionState:
positions: dict[int, int]
Expand Down
6 changes: 3 additions & 3 deletions examples/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ def get_partition(self, message: StoredMessage[EventA | EventB]) -> int:
)
link_stream.projector.update_full()

first_id = next(link_stream.read(conn=db_engine.connect(), partition=0))
print(first_id)
first_on_partition0 = next(link_stream.read(conn=db_engine.connect(), partition=0))
print(first_on_partition0.message_id)
# 4680cbaf-977e-43a4-afcb-f88e92043e9c (this is the message ID of the first message in partition 0)

with message_store.reader() as reader:
print(reader.get_message_by_id(first_id))
print(reader.get_message_by_id(first_on_partition0.message_id))
# StoredMessage(
# message_id=UUID("4680cbaf-977e-43a4-afcb-f88e92043e9c"),
# stream="aggregate-me-0",
Expand Down
9 changes: 4 additions & 5 deletions tests/aggregated_stream/test_aggregated_stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from depeche_db import (
StreamPartitionStatistic,
)
from depeche_db import StreamPartitionStatistic


def test_stream_statisitics(db_engine, store_with_events, stream_factory):
Expand All @@ -9,12 +7,13 @@ def test_stream_statisitics(db_engine, store_with_events, stream_factory):
subject.projector.update_full()

with db_engine.connect() as conn:
assert list(subject.read(conn, partition=1)) == [
assert [msg.message_id for msg in subject.read(conn, partition=1)] == [
evt.event_id for evt in account.events
]
assert list(subject.read(conn, partition=2)) == [
assert [msg.message_id for msg in subject.read(conn, partition=2)] == [
evt.event_id for evt in account2.events
]

assert list(
subject.get_partition_statistics(position_limits={1: 1}, result_limit=1)
)[0] == StreamPartitionStatistic(
Expand Down
8 changes: 4 additions & 4 deletions tests/aggregated_stream/test_stream_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def test_stream_projector_locking(db_engine, store_factory, stream_factory):

def assert_stream_projection(stream, db_engine, account, account2):
with db_engine.connect() as conn:
assert set(stream.read(conn, partition=1)).union(
set(stream.read(conn, partition=2))
assert {msg.message_id for msg in stream.read(conn, partition=1)}.union(
{msg.message_id for msg in stream.read(conn, partition=2)}
) == {event.event_id for event in account.events + account2.events}
assert list(stream.read(conn, partition=1)) == [
assert [msg.message_id for msg in stream.read(conn, partition=1)] == [
event.event_id for event in account.events
]
assert list(stream.read(conn, partition=2)) == [
assert [msg.message_id for msg in stream.read(conn, partition=2)] == [
event.event_id for event in account2.events
]

0 comments on commit b82cb39

Please sign in to comment.