From 055ff79e928f1463f1d43905309bec287d5070c8 Mon Sep 17 00:00:00 2001 From: Martin Vielsmaier Date: Fri, 25 Oct 2024 11:24:55 +0200 Subject: [PATCH] WiP --- depeche_db/_subscription.py | 2 ++ examples/basic.py | 9 ++++++--- tests/aggregated_stream/test_delete.py | 11 +++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/depeche_db/_subscription.py b/depeche_db/_subscription.py index 616b922..2b75ee5 100644 --- a/depeche_db/_subscription.py +++ b/depeche_db/_subscription.py @@ -223,6 +223,8 @@ def get_next_messages(self, count: int) -> Iterator[SubscriptionMessage[E]]: count=count, ): if isinstance(message, DeletedAggregatedStreamMessage): + # TODO needs to update the state so that we can continue + # if there are more deleted messages in a row than `count` continue ack = AckOp( name=self.name, diff --git a/examples/basic.py b/examples/basic.py index 511215f..50b6617 100644 --- a/examples/basic.py +++ b/examples/basic.py @@ -6,13 +6,12 @@ import sqlalchemy as _sa from depeche_db import ( - MessageProtocol, MessageStore, ) from depeche_db.tools import PydanticMessageSerializer -class MyEvent(_pydantic.BaseModel, MessageProtocol): +class MyEvent(_pydantic.BaseModel): event_id: _uuid.UUID = _pydantic.Field(default_factory=_uuid.uuid4) num: int happened_at: _dt.datetime = _pydantic.Field(default_factory=_dt.datetime.utcnow) @@ -43,7 +42,7 @@ class EventB(MyEvent): ) stream = f"stream-{_uuid.uuid4()}" -events = [EventA(num=n + 1) for n in range(3)] +events = [EventA(num=n + 1) for n in range(4)] result = message_store.write(stream=stream, message=events[0]) print(result) @@ -65,3 +64,7 @@ class EventB(MyEvent): message_store.synchronize(stream=stream, messages=events, expected_version=3) print("Now we have 4 events") print([e.message.num for e in message_store.read(stream)]) + +message_store.delete(stream=stream, keep_versions_greater_than=-2) +print("Now we have only the last 2 events") +print([e.message.num for e in message_store.read(stream)]) diff --git a/tests/aggregated_stream/test_delete.py b/tests/aggregated_stream/test_delete.py index b8bf5af..24f753b 100644 --- a/tests/aggregated_stream/test_delete.py +++ b/tests/aggregated_stream/test_delete.py @@ -18,3 +18,14 @@ def test_deleted_events_in_stream(db_engine, store_with_events, stream_factory): ), ] assert type(messages[-1]) == LoadedAggregatedStreamMessage + + +def test_case(): + # TODO: + # * Add 1 message to a stream + # * set up aggregated stream and update it + # * add 2 more messages + # * delete all but the last message + # * update aggregated stream + # -> what happened?! + pass