Skip to content

Commit

Permalink
MAJOR Rewrite (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Aug 9, 2023
1 parent dc92735 commit 5b1b225
Show file tree
Hide file tree
Showing 45 changed files with 844 additions and 745 deletions.
41 changes: 10 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,38 @@ The purpose of this package is to explore the idea, find potential pitfalls and

Dynamic Consistency Boundary (aka DCB) allow to enforce hard constraints in Event-Sourced systems without having to rely on individual Event Streams.
This facilitates focussing on the _behavior_ of the Domain Model rather than on its rigid structure. It also allows for simpler architecture and potential
performance improvements as multiple aggregates can act on the same events without requiring synchronization.
performance improvements as multiple projections can act on the same events without requiring synchronization.

Read all about this interesting approach in the blog post mentioned above or watch Saras talk on [YouTube](https://www.youtube.com/watch?v=DhhxKoOpJe0&t=150s) (Italian with English subtitles).
This package models the example of this presentation (with a few deviations) using the [wwwision/dcb-eventstore](https://github.com/bwaidelich/dcb-eventstore) package and the [wwwision/dcb-eventstore-doctrine](https://github.com/bwaidelich/dcb-eventstore-doctrine) database adapter.

### Important Classes / Concepts

* [Commands](src/Command) are just a concept of this example package. They implement the [Command Marker Interface](src/Command/Command.php)
* [Commands](src%2FCommands) are just a concept of this example package. They implement the [Command Marker Interface](src%2FCommands%2FCommand.php)
* The [CommandHandler](src/CommandHandler.php) is the central authority, handling and verifying incoming Commands
* ...it uses the [AggregateLoader](https://github.com/bwaidelich/dcb-eventstore/blob/main/src/Aggregate/AggregateLoader.php) to interact with all involved Aggregates¹
* The [Aggregates](src/Model/Aggregate) are surprisingly small because they focus on a single responsibility (e.g. instead of a "CourseAggregate" there are three aggregates [CourseExistenceAggregate](src/Model/Aggregate/CourseExistenceAggregate.php), [CourseTitleAggregate](src/Model/Aggregate/CourseTitleAggregate.php) and [CourseCapacityAggregate](src/Model/Aggregate/CourseCapacityAggregate))
* ...Aggregates record [Events](src/Event) that are serialized with the [EventNormalizer](src/Event/Normalizer/EventNormalizer.php)
* This package contains no Read Model (e.g. projections) yet
* ...it uses in-memory [Projections](src%2FProjections%2FProjection.php) to enforce hard constraints
* The [Projections](src%2FProjections%2FProjection.php) are surprisingly small because they focus on a single responsibility (e.g. instead of a "CourseAggregate" there are three projections [CourseExistenceProjection.php](src%2FProjections%2FCourseExistenceProjection.php), [CourseTitleProjection](src%2FProjections%2FCourseTitleProjection.php) and [CourseCapacityProjection.php](src%2FProjections%2FCourseCapacityProjection.php))
* The [EventAppender](src%2FEventAppender.php) allows for easy publishing of [Events](src%2FEvents) that are serialized with the [EventNormalizer](src%2FEventNormalizer.php)
* This package contains no Read Model (i.e. classic projections) yet

### Considerations / Findings

I always had the feeling, that the focus on Event Streams is a distraction to Domain-driven design. So I was very happy to come across this concept.
So far I didn't have the chance to test it in a real world scenario, but it makes a lot of sense to me and IMO this example shows, that the approach
really works out in practice (in spite of some minor caveats in the current implementation¹).

#### Some further thoughts

* The signature of the [EventStore::append()](https://github.com/bwaidelich/dcb-eventstore/blob/main/src/EventStore.php#L36) method, is still a bit cumbersome and implicit
* A `$lastEventId` parameter of `null` has a special meaning (Maybe a union type would be better suited here)
* Instead of working with Event *IDs* it might make sense to use the global "sequence number" instead (in this implementation I have to expose that anyways) as that's easier to work with and to debug (as it gives expected vs actual value a clear order)
* The [StreamQuery](https://github.com/bwaidelich/dcb-eventstore/blob/main/src/Model/StreamQuery.php) has too many states because Domain Ids and Event Types can either be:
* a) `null` => fallback matching all events
* b) A set of values, matching events with at least one overlap
* c) "none" => an empty set matching no events (required for the initial event and for tests)
* I don't like that Aggregates have to specify the event types explicitly ([example](https://github.com/bwaidelich/dcb-example/blob/main/src/Model/Aggregate/StudentSubscriptionsAggregate.php#L73)) – This is a potential source of bugs
* Maybe the "projection" logic can be reworked such that affected event types can be extracted from it
* A lot of complexity comes from having to reconstitute multiple Aggregates at once¹
* Maybe it makes more sense to have nested Aggregates, i.e. the [StudentSubscriptionsAggregate](https://github.com/bwaidelich/dcb-example/blob/main/src/Model/Aggregate/StudentSubscriptionsAggregate.php) could create the depending aggregates ([CourseExistenceAggregate](https://github.com/bwaidelich/dcb-example/blob/main/src/Model/Aggregate/CourseExistenceAggregate.php), [StudentExistenceAggregate](https://github.com/bwaidelich/dcb-example/blob/main/src/Model/Aggregate/StudentExistenceAggregate.php) and [CourseCapacityAggregate](https://github.com/bwaidelich/dcb-example/blob/main/src/Model/Aggregate/CourseCapacityAggregate.php) and do the composition that the [AggregateLoader](https://github.com/bwaidelich/dcb-eventstore/blob/main/src/Aggregate/AggregateLoader.php) does currently...
really works out in practice (in spite of some minor caveats in the current implementation).

## Usage

Install via [composer](https://getcomposer.org):

```shell
composer create-project wwwision/dcb-example
composer create-project wwwision/dcb-example-courses
```

Now you should be able to run the [example script](index.php) via

```shell
php dcb-example/index.php
php dcb-example-courses/index.php
```

And you should get ...no output at all. That's because the example script currently satisfy all constraints.
Expand Down Expand Up @@ -89,11 +75,4 @@ Most of the implementation of these packages are based on the great groundwork d
## Contributions

I'm really curious to get feedback on this one.
Feel free to start/join a [discussion](https://github.com/bwaidelich/dcb-example/discussions), [issues](https://github.com/bwaidelich/dcb-example/issues) or [Pull requests](https://github.com/bwaidelich/dcb-example/pulls).

-----

¹ The purpose of the [AggregateLoader](https://github.com/bwaidelich/dcb-eventstore/blob/main/src/Aggregate/AggregateLoader.php)
is to allow interaction with multiple Aggregates without having to fetch multiple Event Streams.
It is currently one of the weakest links in this implementation because it adds some hidden complexity – I hope, that I
can rework this at some point
Feel free to start/join a [discussion](https://github.com/bwaidelich/dcb-example/discussions), [issues](https://github.com/bwaidelich/dcb-example/issues) or [Pull requests](https://github.com/bwaidelich/dcb-example/pulls).
4 changes: 3 additions & 1 deletion behat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ default:
suites:
default:
paths: ['%paths.base%/tests/Behat']
contexts: [Wwwision\DCBExample\Tests\Behat\Bootstrap\FeatureContext]
contexts:
- Wwwision\DCBExample\Tests\Behat\Bootstrap\FeatureContext:
eventStoreDsn: 'pdo-pgsql://bwaidelich@127.0.0.1:5432/dcb'
formatters:
pretty: false
progress: true
215 changes: 215 additions & 0 deletions bench.php.bkp
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
<?php
declare(strict_types=1);

use Doctrine\DBAL\DriverManager;
use Spatie\Async\Pool;
use Wwwision\DCBEventStore\Exceptions\ConditionalAppendFailed;
use Wwwision\DCBExample\Exception\ConstraintException;
use Wwwision\DCBEventStore\Types\DomainIds;
use Wwwision\DCBEventStore\Types\EventData;
use Wwwision\DCBEventStore\Types\EventEnvelope;
use Wwwision\DCBEventStore\Types\EventId;
use Wwwision\DCBEventStore\Types\Events;
use Wwwision\DCBEventStore\Types\EventType;
use Wwwision\DCBEventStore\Types\EventTypes;
use Wwwision\DCBEventStore\Types\StreamQuery\StreamQuery;
use Wwwision\DCBEventStoreDoctrine\DoctrineEventStore;
use Wwwision\DCBExample\Commands\Command;
use Wwwision\DCBExample\Commands\CreateCourse;
use Wwwision\DCBExample\Commands\RegisterStudent;
use Wwwision\DCBExample\Commands\RenameCourse;
use Wwwision\DCBExample\Commands\SubscribeStudentToCourse;
use Wwwision\DCBExample\Commands\UnsubscribeStudentFromCourse;
use Wwwision\DCBExample\Commands\UpdateCourseCapacity;
use Wwwision\DCBExample\CommandHandler;
use Wwwision\DCBExample\Model\CourseCapacity;
use Wwwision\DCBExample\Model\CourseId;
use Wwwision\DCBExample\Model\CourseTitle;
use Wwwision\DCBExample\Model\StudentId;

require __DIR__ . '/vendor/autoload.php';

$dsn = 'pdo-mysql://root:NwR2pEdKK@.UoUtHebFgH!br@127.0.0.1:3306/test?charset=utf8mb4';
//$dsn = 'pdo-pgsql://bwaidelich@127.0.0.1:5432/dcb';
//$dsn = 'pdo-sqlite:///events.sqlite';
$connection = DriverManager::getConnection(['url' => $dsn]);
$eventStore = DoctrineEventStore::create($connection, 'dcb_events');
$eventStore->setup();
echo 'resetting db...' . PHP_EOL;
$connection->executeStatement('TRUNCATE TABLE dcb_events');
//$connection->executeStatement('TRUNCATE TABLE dcb_events RESTART IDENTITY');
//$connection->executeStatement('DELETE FROM dcb_events');
echo 'done' . PHP_EOL;

//$eventStore->append(Events::single(EventId::create(), EventType::fromString('SomeEvent'), EventData::fromString('test'), DomainIds::single('foo', 'bar')), StreamQuery::matchingIdsAndTypes(DomainIds::single('foo', 'bar'), EventTypes::single('SomeType')), null);
////exit;
////
//foreach ($eventStore->stream(StreamQuery::matchingAny()) as $eventEnvelope) {
// var_dump($eventEnvelope);
//}
//echo PHP_EOL . 'done' . PHP_EOL;
//exit;
//

echo 'appending events...' . PHP_EOL;

$process = function () use ($dsn) {

$rand = static fn (int $percentage) => random_int(1, 100) < $percentage;
$connection = DriverManager::getConnection(['url' => $dsn]);
$eventStore = DoctrineEventStore::create($connection, 'dcb_events');
/** @var {@see CommandHandler} is the central authority to handle {@see Command}s */
$commandHandler = new CommandHandler($eventStore);
$constraintViolations = 0;

$handle = function (Command $command) use ($commandHandler, &$constraintViolations) {
//echo $command::class . PHP_EOL;
try {
$commandHandler->handle($command);
} catch (ConditionalAppendFailed|ConstraintException $e) {
$constraintViolations ++;
//echo 'CONSTRAINT EXCEPTION: ' . $e->getMessage() . PHP_EOL;
}
};

for ($i = 0; $i < 100; $i++) {

$courseId = CourseId::fromString('c' . random_int(1, 5));
$studentId = StudentId::fromString('s' . random_int(1, 5));
$capacity = CourseCapacity::fromInteger(random_int(1, 10));


if ($rand(80)) {
$handle(new CreateCourse($courseId, $capacity, CourseTitle::fromString((string)getmypid())));
}
if ($rand(40)) {
$handle(new RenameCourse($courseId, CourseTitle::fromString('Course renamed ' . md5(random_bytes(5)))));
}
if ($rand(80)) {
$handle(new RegisterStudent($studentId));
}

if ($rand(50)) {
$handle(new SubscribeStudentToCourse($courseId, $studentId));
}

if ($rand(20)) {
$handle(new UpdateCourseCapacity($courseId, $capacity));
}

if ($rand(10)) {
$handle(new UnsubscribeStudentFromCourse($courseId, $studentId));
}

//usleep(random_int(100, 10000));
}
return ['constraintViolations' => $constraintViolations];
};
//
//$process();
//die('DONE');

$pool = Pool::create();
for ($i = 0; $i < 20; $i ++) {
$pool->add($process)->then(function ($output) {
//echo 'OUTPUT:' . PHP_EOL;
//var_dump($output);
})->catch(function ($exception) {
echo 'EXCEPTION:' . PHP_EOL;
var_dump($exception);
})->timeout(function () {
echo 'TIMEOUT' . PHP_EOL;
});
}
$pool->wait();

echo 'done' . PHP_EOL;
echo 'checking inconsistencies...' . PHP_EOL;

$courses = [];
$students = [];

function fail(EventEnvelope $eventEnvelope, string $message, ...$args) {
throw new RuntimeException(sprintf('ERROR at sequence number ' . $eventEnvelope->sequenceNumber->value . ': ' . $message, ...$args));
}

$numberOfEvents = 0;

foreach ($eventStore->streamAll() as $eventEnvelope) {
$payload = json_decode($eventEnvelope->event->data->value, true, 512, JSON_THROW_ON_ERROR);
$courseId = $payload['courseId'] ?? null;
$studentId = $payload['studentId'] ?? null;
$numberOfEvents ++;

switch ($eventEnvelope->event->type->value) {
case 'CourseCreated':
if (isset($courses[$courseId])) {
fail($eventEnvelope, 'Course "%s" already exists', $courseId);
}
$courses[$courseId] = [
'title' => $payload['courseTitle'],
'capacity' => $payload['initialCapacity'],
'subscriptions' => 0,
];
break;
case 'CourseRenamed':
if (!isset($courses[$courseId])) {
fail($eventEnvelope, 'Course "%s" does not exist', $courseId);
}
if ($courses[$courseId]['title'] === $payload['newCourseTitle']) {
fail($eventEnvelope, 'Course title of "%s" did not change', $courseId);
}
$courses[$courseId]['title'] = $payload['newCourseTitle'];
break;
case 'StudentRegistered':
if (isset($students[$studentId])) {
fail($eventEnvelope, 'Student "%s" already exists', $studentId);
}
$students[$studentId] = [
'courses' => []
];
break;
case 'StudentSubscribedToCourse':
if (!isset($courses[$courseId])) {
fail($eventEnvelope, 'Course "%s" does not exist', $courseId);
}
if (!isset($students[$studentId])) {
fail($eventEnvelope, 'Student "%s" does not exist', $studentId);
}
if (in_array($courseId, $students[$studentId]['courses'], true)) {
fail($eventEnvelope, 'Student "%s" already subscribed to course "%s"', $studentId, $courseId);
}
if ($courses[$courseId]['subscriptions'] >= $courses[$courseId]['capacity']) {
fail($eventEnvelope, 'Course "%s" capacity exceeded', $courseId);
}
$courses[$courseId]['subscriptions'] ++;
$students[$studentId]['courses'] = [...$students[$studentId]['courses'], $courseId];
break;
case 'CourseCapacityChanged':
if (!isset($courses[$courseId])) {
fail($eventEnvelope, 'Course "%s" does not exist', $courseId);
}
if ($courses[$courseId]['subscriptions'] > $payload['newCapacity']) {
fail($eventEnvelope, 'Course "%s" capacity cannot be changed because it already has more subscriptions', $courseId);
}
$courses[$courseId]['capacity'] = $payload['newCapacity'];
break;

case 'StudentUnsubscribedFromCourse':
if (!isset($courses[$courseId])) {
fail($eventEnvelope, 'Course "%s" does not exist', $courseId);
}
if (!isset($students[$studentId])) {
fail($eventEnvelope, 'Student "%s" does not exist', $studentId);
}
if (!in_array($courseId, $students[$studentId]['courses'], true)) {
fail($eventEnvelope, 'Student "%s" is not subscribed to course "%s"', $studentId, $courseId);
}
$courses[$courseId]['subscriptions'] --;
$students[$studentId]['courses'] = array_filter($students[$studentId]['courses'], static fn($c) => $c !== $courseId);
break;
default:
fail($eventEnvelope, 'Unexpected event type "%s"', $eventEnvelope->event->type->value);
}
}
printf('Checked %d events', $numberOfEvents);
11 changes: 4 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "wwwision/dcb-example",
"name": "wwwision/dcb-example-courses",
"description": "Simple example for the Dynamic Consistency Boundary pattern described by Sara Pellegrini",
"type": "project",
"license": "MIT",
Expand Down Expand Up @@ -28,15 +28,14 @@
"php": ">=8.2",
"ramsey/uuid": "^4.7",
"webmozart/assert": "^1.11",
"wwwision/dcb-eventstore": "^2.0",
"wwwision/dcb-eventstore-doctrine": "^2.0",
"spatie/async": "^1.5"
"wwwision/dcb-eventstore": "^2",
"wwwision/dcb-eventstore-doctrine": "^2"
},
"require-dev": {
"roave/security-advisories": "dev-latest",
"phpstan/phpstan": "^1.10",
"squizlabs/php_codesniffer": "^4.0.x-dev",
"phpunit/phpunit": "^10.1",
"phpunit/phpunit": "^10.2",
"behat/behat": "^3.13"
},
"autoload": {
Expand All @@ -53,12 +52,10 @@
"test-phpstan": "phpstan",
"test-cs": "phpcs --colors --standard=PSR12 --exclude=Generic.Files.LineLength src",
"test-cs:fix": "phpcbf --colors --standard=PSR12 --exclude=Generic.Files.LineLength src",
"test-unit": "phpunit tests",
"test-behat": "behat",
"test": [
"@test-phpstan",
"@test-cs",
"@test-unit",
"@test-behat"
]
}
Expand Down
26 changes: 13 additions & 13 deletions index.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
declare(strict_types=1);

use Doctrine\DBAL\DriverManager;
use Wwwision\DCBEventStore\EventStore;
use Wwwision\DCBEventStoreDoctrine\DoctrineEventStore;
use Wwwision\DCBExample\Command\Command;
use Wwwision\DCBExample\Command\CreateCourse;
use Wwwision\DCBExample\Command\RegisterStudent;
use Wwwision\DCBExample\Command\RenameCourse;
use Wwwision\DCBExample\Command\SubscribeStudentToCourse;
use Wwwision\DCBExample\Command\UnsubscribeStudentFromCourse;
use Wwwision\DCBExample\Command\UpdateCourseCapacity;
use Wwwision\DCBExample\CommandHandler;
use Wwwision\DCBExample\Model\CourseCapacity;
use Wwwision\DCBExample\Model\CourseId;
use Wwwision\DCBExample\Model\CourseTitle;
use Wwwision\DCBExample\Model\StudentId;
use Wwwision\DCBEventStore\EventStore;
use Wwwision\DCBExample\Commands\Command;
use Wwwision\DCBExample\Commands\CreateCourse;
use Wwwision\DCBExample\Commands\RegisterStudent;
use Wwwision\DCBExample\Commands\RenameCourse;
use Wwwision\DCBExample\Commands\SubscribeStudentToCourse;
use Wwwision\DCBExample\Commands\UnsubscribeStudentFromCourse;
use Wwwision\DCBExample\Commands\UpdateCourseCapacity;
use Wwwision\DCBExample\Types\CourseCapacity;
use Wwwision\DCBExample\Types\CourseId;
use Wwwision\DCBExample\Types\CourseTitle;
use Wwwision\DCBExample\Types\StudentId;

require __DIR__ . '/vendor/autoload.php';

Expand All @@ -25,7 +25,7 @@
/** The second parameter is the table name to store the events in **/
$eventStore = DoctrineEventStore::create($connection, 'dcb_events');

/** The {@see EventStore::setup() method is used to make sure that the Event Store backend is set up (i.e. required tables are created and their schema up-to-date) **/
/** The {@see EventStore::setup()} method is used to make sure that the Events Store backend is set up (i.e. required tables are created and their schema up-to-date) **/
$eventStore->setup();

/** @var {@see CommandHandler} is the central authority to handle {@see Command}s */
Expand Down
Loading

0 comments on commit 5b1b225

Please sign in to comment.