Skip to content

Commit

Permalink
Add SQLAlchemy Event Store, Update Invite flow, Value Object
Browse files Browse the repository at this point in the history
This commit includes an example of a "Value Object" named EmailAddress.
It represents a valid email address. Consumers of this object can expect that
the object is valid according to the data it contains such that the consumer
does not have to check if the email is valid.

An implementation of a SQLAlchemy-backed Event Store was also added along with a
unit test using a SQLite in-memory.

The Invite flow spec and code has been updated to include the idea of an invite
that is pending to be sent.
  • Loading branch information
paulespinosa committed Oct 22, 2024
1 parent 4116362 commit 25b24ad
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 95 deletions.
24 changes: 12 additions & 12 deletions backend/app/core/event_store.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .interfaces import Identity, DomainEvent
from abc import abstractmethod
from typing import Any, Protocol
from dataclasses import dataclass
from datetime import datetime, timezone
import importlib
import json
from typing import Any, Protocol

from .interfaces import Identity, DomainEvent

class AppendOnlyStoreConcurrencyException(Exception):
pass
Expand Down Expand Up @@ -33,37 +33,37 @@ class DomainEventStream:
class InMemoryEventStore:

@dataclass
class EventStoreRow:
class EventStreamEntry:
stream_id: str
stream_version: int
event_data: str
meta_data: dict[str, Any]
stored_at: datetime

def __init__(self):
self.events: dict[int, list[self.EventStoreRow]] = {}
self.events: dict[int, list[self.EventStreamEntry]] = {}

def fetch(self, stream_id: Identity) -> DomainEventStream:
stream = DomainEventStream(version=0, events=[])

for row in self.events.get(stream_id, []):
stream.version = row.stream_version
for stream_entry in self.events.get(stream_id, []):
stream.version = stream_entry.stream_version
stream.events.append(
self._deserialize_event(json.loads(row.event_data)))
self._deserialize_event(json.loads(stream_entry.event_data)))

return stream

def append(self, stream_id: Identity, new_events: list[DomainEvent],
expected_version: int):
rows = self.events.get(stream_id, [])
stream_entries = self.events.get(stream_id, [])

version = len(rows)
version = len(stream_entries)
if version != expected_version:
raise AppendOnlyStoreConcurrencyException(
f"version={version}, expected={expected_version}")

rows.extend([
self.EventStoreRow(
stream_entries.extend([
self.EventStreamEntry(
stream_id=str(stream_id),
stream_version=version + inc,
event_data=json.dumps(e.to_dict(), default=str),
Expand All @@ -72,7 +72,7 @@ def append(self, stream_id: Identity, new_events: list[DomainEvent],
) for inc, e in enumerate(new_events, start=1)
])

self.events[stream_id] = rows
self.events[stream_id] = stream_entries

def _deserialize_event(self, event_data):
"""Convert a dictionary back to the correct event class."""
Expand Down
117 changes: 117 additions & 0 deletions backend/app/core/sa_event_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""This module implements a SQLAlchemy-backed Event Store."""
from datetime import datetime, timezone
import importlib
import uuid

from sqlalchemy import String, Integer, DateTime, JSON, func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, mapped_column, Mapped

from app.core.db import Base
from app.core.interfaces import Identity
from .event_store import (AppendOnlyStoreConcurrencyException, DomainEvent,
DomainEventStream)


class EventStreamEntry(Base):
"""SQLAlchemy model representing a row entry in the event_streams table."""

__tablename__ = 'event_streams'

# Primary key: composite (stream_id, stream_version)
stream_id: Mapped[str] = mapped_column(String(36),
primary_key=True,
default=lambda: str(uuid.uuid4()))
stream_version: Mapped[int] = mapped_column(Integer, primary_key=True)

# Event data and meta data columns
event_data: Mapped[dict] = mapped_column(JSON, nullable=False)
meta_data: Mapped[dict] = mapped_column(JSON, nullable=True)

# Timestamp of when the event was stored
stored_at: Mapped[datetime] = mapped_column(DateTime(timezone=True),
nullable=False,
default=func.now)

def __repr__(self):
"""Representation of this object as a string."""
return (
f"<EventStreamEntry(stream_id={self.stream_id}, stream_version={self.stream_version}, "
f"stored_at={self.stored_at})>")


class SqlAlchemyEventStore:
"""Implementation of an Event Store backed by SQLAlchemy."""

def __init__(self, session: Session):
"""Instantiate the Event Store using a SQLAlchemy session."""
if session is None:
raise ValueError("A Session is required to construct this Event Store.")

self.session = session

def fetch(self, stream_id: Identity) -> DomainEventStream:
"""Fetch the event stream for the given stream."""
stream = DomainEventStream(version=0, events=[])

stream_entries = self.session.execute(
select(EventStreamEntry.stream_version,
EventStreamEntry.event_data)).all()

for stream_version, event_data in stream_entries:
stream.version = stream_version
stream.events.append(
self._deserialize_event(event_data))

return stream

def append(self, stream_id: Identity, new_events: list[DomainEvent],
expected_version: int):
"""Append list of events for the given stream.
An AppendOnlyStoreConcurrencyException is raised when the given
expected version is not the last version found in the database for
the given stream. This means that another process has already
updated the stream's events.
"""
statement = select(func.max(
EventStreamEntry.stream_version)).filter_by(
stream_id=str(stream_id))
version = self.session.scalars(statement).one_or_none()

if version is None:
version = 0
if version != expected_version:
raise AppendOnlyStoreConcurrencyException(
f"version={version}, expected={expected_version}, stream_id={stream_id}"
)

stream_entries = [
EventStreamEntry(
stream_id=str(stream_id),
stream_version=version + inc,
event_data=e.to_dict(),
meta_data={},
stored_at=datetime.now(tz=timezone.utc),
) for inc, e in enumerate(new_events, start=1)
]

self.session.add_all(stream_entries)
try:
self.session.commit()
except IntegrityError:
self.session.rollback()
raise ValueError(
"Failed to append events due to database integrity error (likely a version conflict)."
)

def _deserialize_event(self, event_data):
"""Convert a dictionary back to the correct event class."""
fully_qualified_type = event_data["type"]
module_name, class_name = fully_qualified_type.rsplit(".", 1)

# Dynamically import the module and get the class
module = importlib.import_module(module_name)
event_class = getattr(module, class_name)

return event_class.from_dict(event_data["data"])
26 changes: 19 additions & 7 deletions backend/app/modules/access/invite/contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,44 @@ class SendInviteCommand(DomainCommand):
full_name: str
email: str
invitee_role: UserRoleEnum
inviter_id: UserId
inviter_id: str
inviter_role: UserRoleEnum
sent_at: datetime


@dataclass
class InviteSentDomainEvent(DomainEvent):
class SendInviteRequestedDomainEvent(DomainEvent):
"""An Invite domain event."""

full_name: str
email: str
full_name: str
invitee_role: UserRoleEnum
inviter_id: UserId
inviter_id: str
inviter_role: UserRoleEnum
sent_at: datetime
expire_at: datetime

@classmethod
def from_dict(cls, data: dict[str, Any]):
"""Deserialize from dict to the correct event class."""
sent_at = datetime.fromisoformat(data['sent_at'])
expire_at = datetime.fromisoformat(data['expire_at'])
return cls(full_name=data['full_name'],
email=data['email'],
invitee_role=data['invitee_role'],
inviter_id=data['inviter_id'],
inviter_role=data['inviter_role']
sent_at=sent_at)
inviter_role=data['inviter_role'],
sent_at=sent_at,
expire_at=expire_at)


@dataclass
class InviteSentDomainEvent(DomainEvent):
"""An Invite domain event."""

email: str
full_name: str
expire_at: datetime


@dataclass
Expand All @@ -71,7 +83,7 @@ class InviteAlreadySentException(Exception):
pass


class UninvitedException(Exception):
class NotInvitedException(Exception):
"""An invite was accepted without an invitation."""

pass
55 changes: 37 additions & 18 deletions backend/app/modules/access/invite/invite_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from ..schemas import UserRoleEnum
from .contracts import (
InviteId,
SendInviteRequestedDomainEvent,
InviteSentDomainEvent,
InviteAcceptedDomainEvent,
InviteAlreadySentException,
UninvitedException,
NotInvitedException,
)

from collections.abc import Callable
Expand All @@ -19,11 +20,13 @@ class InviteState:
email: InviteId = None
full_name: str = None
invitee_role: UserRoleEnum = None
inviter_id: UserId = None
inviter_id: str = None
inviter_role: UserRoleEnum = None
sent_at: datetime = None
expire_at: datetime = None
accepted_at: datetime = None
pending_send_invite: bool = False
invited: bool = False

def __init__(self, domain_events: list[DomainEvent]):
"""Initialize state from given events."""
Expand All @@ -34,13 +37,20 @@ def mutate(self, domain_event: DomainEvent):
"""Update the state based on the domain event."""
getattr(self, 'when_' + domain_event.__class__.__name__)(domain_event)

def when_InviteSentDomainEvent(self, event: InviteSentDomainEvent):
def when_SendInviteRequestedDomainEvent(
self, event: SendInviteRequestedDomainEvent):
"""Update the state of an Invite."""
self.full_name = event.full_name
self.email = event.email
self.full_name = event.full_name
self.invitee_role = event.invitee_role
self.sent_by = event.sent_by
self.inviter_id = event.inviter_id
self.inviter_role = event.inviter_role
self.sent_at = event.sent_at

self.pending_send_invite = True

def when_InviteSentDomainEvent(self, event: InviteSentDomainEvent):
"""Update the state of an Invite."""
self.invited = True

def when_InviteAcceptedDomainEvent(self, event: InviteAcceptedDomainEvent):
Expand All @@ -65,26 +75,35 @@ def changes(self):
"""See a view into the events that cause state changes."""
return self._changes

def send_invite(self, full_name: str, email: str, invitee_role: str,
sent_by: str, sent_at: datetime, token_gen: Callable[[str],
str]):
def send_invite(self, full_name: str, email: str,
invitee_role: UserRoleEnum, inviter_id: str,
inviter_role: UserRoleEnum, sent_at: datetime,
expire_policy: Callable[[datetime], datetime]):
"""Send an invite to the given recipient."""
if self._state.invited:
if self._state.pending_send_invite or self._state.invited:
raise InviteAlreadySentException(email)

e = InviteSentDomainEvent(full_name=full_name,
email=email,
invitee_role=invitee_role,
sent_by=sent_by,
sent_at=sent_at,
token=token_gen(email))
e = SendInviteRequestedDomainEvent(email=email,
full_name=full_name,
invitee_role=invitee_role,
inviter_id=inviter_id,
inviter_role=inviter_role,
sent_at=sent_at,
expire_at=expire_policy(sent_at))
self._apply(e)

def process_sent_invite(self, email: str):
"""Process a sent invite."""
e = InviteSentDomainEvent(email, self._state.full_name,
self._state.expire_at)

self._apply(e)

def accept_invite(self, email: str, token: str):
def accept_invite(self, email: str, accepted_at: datetime):
"""Accept an invite."""
if not self._state.invited:
raise UninvitedException(f"{email} was not invited.")
raise NotInvitedException(f"{email} was not invited.")

e = InviteAcceptedDomainEvent(email=email, token=token)
e = InviteAcceptedDomainEvent(email=email, accepted_at=accepted_at)

self._apply(e)
Loading

0 comments on commit 25b24ad

Please sign in to comment.