Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
moser committed Oct 25, 2024
1 parent 7720122 commit 055ff79
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
2 changes: 2 additions & 0 deletions depeche_db/_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ def get_next_messages(self, count: int) -> Iterator[SubscriptionMessage[E]]:
count=count,
):
if isinstance(message, DeletedAggregatedStreamMessage):
# TODO needs to update the state so that we can continue
# if there are more deleted messages in a row than `count`
continue
ack = AckOp(
name=self.name,
Expand Down
9 changes: 6 additions & 3 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
import sqlalchemy as _sa

from depeche_db import (
MessageProtocol,
MessageStore,
)
from depeche_db.tools import PydanticMessageSerializer


class MyEvent(_pydantic.BaseModel, MessageProtocol):
class MyEvent(_pydantic.BaseModel):
event_id: _uuid.UUID = _pydantic.Field(default_factory=_uuid.uuid4)
num: int
happened_at: _dt.datetime = _pydantic.Field(default_factory=_dt.datetime.utcnow)
Expand Down Expand Up @@ -43,7 +42,7 @@ class EventB(MyEvent):
)

stream = f"stream-{_uuid.uuid4()}"
events = [EventA(num=n + 1) for n in range(3)]
events = [EventA(num=n + 1) for n in range(4)]

result = message_store.write(stream=stream, message=events[0])
print(result)
Expand All @@ -65,3 +64,7 @@ class EventB(MyEvent):
message_store.synchronize(stream=stream, messages=events, expected_version=3)
print("Now we have 4 events")
print([e.message.num for e in message_store.read(stream)])

message_store.delete(stream=stream, keep_versions_greater_than=-2)
print("Now we have only the last 2 events")
print([e.message.num for e in message_store.read(stream)])
11 changes: 11 additions & 0 deletions tests/aggregated_stream/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,14 @@ def test_deleted_events_in_stream(db_engine, store_with_events, stream_factory):
),
]
assert type(messages[-1]) == LoadedAggregatedStreamMessage


def test_case():
# TODO:
# * Add 1 message to a stream
# * set up aggregated stream and update it
# * add 2 more messages
# * delete all but the last message
# * update aggregated stream
# -> what happened?!
pass

0 comments on commit 055ff79

Please sign in to comment.