Skip to content

Commit

Permalink
Add basic tests for event sourcing.
Browse files Browse the repository at this point in the history
  • Loading branch information
moser committed Oct 26, 2023
1 parent bb09ce0 commit a4fe8fc
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# TBD unreleased
# 0.6.1

* Add small event sourcing lib

# 0.6.0

Expand Down
2 changes: 2 additions & 0 deletions tests/_account_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AccountCreditedEvent(Event):
AccountEvent = Union[AccountRegisteredEvent, AccountCreditedEvent]


# TODO use pydantic serializer
class AccountEventSerializer(MessageSerializer[AccountEvent]):
def serialize(self, message: AccountEvent) -> dict:
return message.model_dump(mode="json")
Expand All @@ -52,6 +53,7 @@ def deserialize(self, message: dict) -> AccountEvent:
return _pydantic.TypeAdapter(AccountEvent).validate_python(message) # type: ignore


# TODO use event_sourceing.EventSourcedAggregateRoot
class AggregateRoot(Generic[E]):
def __init__(self):
self._events: List[E] = []
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions tests/event_sourcing/test_event_sourcing_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import datetime as _dt
import uuid as _uuid
from typing import Union

import pydantic as _pydantic

from depeche_db import MessageStore
from depeche_db import event_sourcing as _es
from depeche_db.tools import PydanticMessageSerializer


class Event(_pydantic.BaseModel):
id: _uuid.UUID = _pydantic.Field(default_factory=_uuid.uuid4)
created_at: _dt.datetime = _pydantic.Field(default_factory=_dt.datetime.utcnow)

def get_message_id(self) -> _uuid.UUID:
return self.id

def get_message_time(self) -> _dt.datetime:
return self.created_at


class FooCreatedEvent(Event):
foo_id: _uuid.UUID
name: str


class FooRenamedEvent(Event):
foo_id: _uuid.UUID
new_name: str


FooEvent = Union[FooCreatedEvent, FooRenamedEvent]


class Foo(_es.EventSourcedAggregateRoot[_uuid.UUID, FooEvent]):
id: _uuid.UUID
name: str

def get_id(self) -> _uuid.UUID | None:
if hasattr(self, "id"):
return self.id
return None

def _apply(self, event: FooEvent) -> None:
if isinstance(event, FooCreatedEvent):
self.id = event.foo_id
self.name = event.name
elif isinstance(event, FooRenamedEvent):
self.name = event.new_name
else:
raise NotImplementedError(f"Event {type(event)} is not supported")

@property
def version(self) -> int:
return self._version

@classmethod
def create(cls, id: _uuid.UUID, name: str) -> "Foo":
foo = cls()
foo.apply(FooCreatedEvent(foo_id=id, name=name))
return foo

def rename(self, new_name: str) -> None:
self.apply(FooRenamedEvent(foo_id=self.id, new_name=new_name))


def test_event_sourced_aggregate():
foo = Foo.create(_uuid.uuid4(), "foo")
assert foo.version == 1
assert foo.name == "foo"
assert len(foo._events) == 1

foo.rename("bar")
assert foo.version == 2
assert foo.name == "bar"
assert len(foo._events) == 2


class FooRepo(_es.EventStoreRepo[FooEvent, Foo, _uuid.UUID]):
def __init__(self, event_store: MessageStore[FooEvent]):
super().__init__(
event_store=event_store,
constructor=Foo,
stream_prefix="foo",
)


def test_repository(identifier, db_engine):
store = MessageStore[FooEvent](
name=identifier(),
engine=db_engine,
serializer=PydanticMessageSerializer(FooEvent),
)
repo = FooRepo(store)

foo = Foo.create(_uuid.uuid4(), "foo")
position = repo.add(foo)
assert position.version == 1

foo.rename("bar")
position = repo.save(foo, expected_version=1)
assert position.version == 2

stored_messages = list(store.read(f"foo-{foo.id}"))
assert [m.message for m in stored_messages] == foo._events

foo2 = repo.get(foo.id)
assert foo2 == foo

0 comments on commit a4fe8fc

Please sign in to comment.