Skip to content

Commit

Permalink
TASK: Remove EventTypesCriterion and TagsCriterion
Browse files Browse the repository at this point in the history
They are now both integrated to `EventTypesAndTagsCriterion`
  • Loading branch information
bwaidelich committed Jul 24, 2024
1 parent 98dcc64 commit 1615cbc
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 186 deletions.
83 changes: 48 additions & 35 deletions src/Helpers/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
use Wwwision\DCBEventStore\Types\AppendCondition;
use Wwwision\DCBEventStore\Types\Event;
use Wwwision\DCBEventStore\Types\EventEnvelope;
use Wwwision\DCBEventStore\Types\EventEnvelopes;
use Wwwision\DCBEventStore\Types\Events;
use Wwwision\DCBEventStore\Types\ReadOptions;
use Wwwision\DCBEventStore\Types\SequenceNumber;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\EventTypesAndTagsCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\EventTypesCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\TagsCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHashes;
use Wwwision\DCBEventStore\Types\StreamQuery\StreamQuery;
Expand All @@ -36,13 +35,11 @@
*/
final class InMemoryEventStore implements EventStore
{
/**
* @var array<array{sequenceNumber:int,recordedAt:DateTimeImmutable,event:Event}>
*/
private array $events = [];
private EventEnvelopes $eventEnvelopes;

private function __construct()
{
$this->eventEnvelopes = EventEnvelopes::none();
}

public static function create(): self
Expand All @@ -52,31 +49,44 @@ public static function create(): self

public function read(StreamQuery $query, ?ReadOptions $options = null): InMemoryEventStream
{
$options ??= ReadOptions::create();
$matchingCriterionHashesBySequenceNumber = [];
$eventEnvelopes = $this->eventEnvelopes;
foreach ($query->criteria as $criterion) {
$onlyLastEvent = $criterion instanceof EventTypesAndTagsCriterion && $criterion->onlyLastEvent;
if ($onlyLastEvent) {
$eventEnvelopes = EventEnvelopes::fromArray(array_reverse(iterator_to_array($eventEnvelopes)));
}
foreach ($eventEnvelopes as $eventEnvelope) {
if (!self::criterionMatchesEvent($criterion, $eventEnvelope->event)) {
continue;
}
$sequenceNumber = $eventEnvelope->sequenceNumber->value;
if (!array_key_exists($sequenceNumber, $matchingCriterionHashesBySequenceNumber)) {
$matchingCriterionHashesBySequenceNumber[$sequenceNumber] = [];
}
$matchingCriterionHashesBySequenceNumber[$sequenceNumber][] = $criterion->hash();
if ($onlyLastEvent) {
continue 2;
}
}
}

$matchingEventEnvelopes = [];
$events = $this->events;
$eventEnvelopes = $this->eventEnvelopes;
$options ??= ReadOptions::create();
if ($options->backwards) {
$events = array_reverse($events);
$eventEnvelopes = EventEnvelopes::fromArray(array_reverse(iterator_to_array($eventEnvelopes)));
}
foreach ($events as $event) {
if ($options->from !== null && (($options->backwards && $event['sequenceNumber'] > $options->from->value) || (!$options->backwards && $event['sequenceNumber'] < $options->from->value))) {
foreach ($eventEnvelopes as $eventEnvelope) {
$sequenceNumber = $eventEnvelope->sequenceNumber->value;
if ($options->from !== null && (($options->backwards && $sequenceNumber > $options->from->value) || (!$options->backwards && $sequenceNumber < $options->from->value))) {
continue;
}
$matchedCriterionHashes = [];
if (!$query->isWildcard()) {
foreach ($query->criteria as $criterion) {
if (array_key_exists($criterion->hash()->value, $matchedCriterionHashes)) {
continue;
}
if (self::criterionMatchesEvent($criterion, $event['event'])) {
$matchedCriterionHashes[$criterion->hash()->value] = true;
}
}
if ($matchedCriterionHashes === []) {
continue;
}
if (!array_key_exists($sequenceNumber, $matchingCriterionHashesBySequenceNumber) && !$query->isWildcard()) {
continue;
}
$matchingEventEnvelopes[] = new EventEnvelope(SequenceNumber::fromInteger($event['sequenceNumber']), $event['recordedAt'], CriterionHashes::fromArray(array_keys($matchedCriterionHashes)), $event['event']);

$matchingEventEnvelopes[] = $eventEnvelope->withCriterionHashes(CriterionHashes::fromArray($matchingCriterionHashesBySequenceNumber[$sequenceNumber] ?? []));
}
return InMemoryEventStream::create(...$matchingEventEnvelopes);
}
Expand All @@ -89,9 +99,7 @@ public function readAll(?ReadOptions $options = null): EventStream
private static function criterionMatchesEvent(Criterion $criterion, Event $event): bool
{
return match ($criterion::class) {
EventTypesAndTagsCriterion::class => $event->tags->containEvery($criterion->tags) && $criterion->eventTypes->contain($event->type),
EventTypesCriterion::class => $criterion->eventTypes->contain($event->type),
TagsCriterion::class => $event->tags->containEvery($criterion->tags),
EventTypesAndTagsCriterion::class => ($criterion->tags === null || $event->tags->containEvery($criterion->tags)) && ($criterion->eventTypes === null || $criterion->eventTypes->contain($event->type)),
default => throw new RuntimeException(sprintf('The criterion type "%s" is not supported by the %s', $criterion::class, self::class), 1700302540),
};
}
Expand All @@ -110,14 +118,19 @@ public function append(Events $events, AppendCondition $condition): void
throw ConditionalAppendFailed::becauseHighestExpectedSequenceNumberDoesNotMatch($condition->expectedHighestSequenceNumber);
}
}
$sequenceNumber = count($this->events);
$sequenceNumber = SequenceNumber::fromInteger(count($this->eventEnvelopes) + 1);
$newEventEnvelopes = EventEnvelopes::none();
foreach ($events as $event) {
$sequenceNumber++;
$this->events[] = [
'sequenceNumber' => $sequenceNumber,
'recordedAt' => new DateTimeImmutable(),
'event' => $event,
];
$newEventEnvelopes = $newEventEnvelopes->append(
new EventEnvelope(
$sequenceNumber,
new DateTimeImmutable(),
CriterionHashes::none(),
$event,
)
);
$sequenceNumber = $sequenceNumber->next();
}
$this->eventEnvelopes = $this->eventEnvelopes->append($newEventEnvelopes);
}
}
5 changes: 5 additions & 0 deletions src/Types/EventEnvelope.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public function __construct(
public readonly Event $event,
) {
}

public function withCriterionHashes(CriterionHashes $criterionHashes): self
{
return new self($this->sequenceNumber, $this->recordedAt, $criterionHashes, $this->event);
}
}
98 changes: 98 additions & 0 deletions src/Types/EventEnvelopes.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?php

declare(strict_types=1);

namespace Wwwision\DCBEventStore\Types;

use ArrayIterator;
use Closure;
use Countable;
use IteratorAggregate;
use JsonSerializable;
use Traversable;

use function array_map;

/**
* A type-safe set of {@see EventEnvelopes} instances
*
* @implements IteratorAggregate<EventEnvelope>
*/
final class EventEnvelopes implements IteratorAggregate, JsonSerializable, Countable
{
/**
* @var array<int, EventEnvelope>
*/
private readonly array $eventEnvelopes;

private function __construct(EventEnvelope ...$eventEnvelopes)
{
$this->eventEnvelopes = array_values($eventEnvelopes);
}

public static function single(EventEnvelope $eventEnvelope): self
{
return new self($eventEnvelope);
}

/**
* @param EventEnvelope[] $eventEnvelopes
*/
public static function fromArray(array $eventEnvelopes): self
{
return new self(...$eventEnvelopes);
}

public static function none(): self
{
return new self();
}

public function getIterator(): Traversable
{
return new ArrayIterator($this->eventEnvelopes);
}

public function at(int $index): EventEnvelope
{
if (!array_key_exists($index, $this->eventEnvelopes)) {
throw new \InvalidArgumentException(sprintf('no EventEnvelope at index %d', $index), 1719995162);
}
return $this->eventEnvelopes[$index];
}

/**
* @param Closure(EventEnvelope $event): mixed $callback
* @return array<int, mixed>
*/
public function map(Closure $callback): array
{
return array_map($callback, $this->eventEnvelopes);
}

public function filter(Closure $callback): self
{
return self::fromArray(array_filter($this->eventEnvelopes, $callback));
}

public function append(EventEnvelope|self $eventEnvelopes): self
{
if ($eventEnvelopes instanceof EventEnvelope) {
$eventEnvelopes = self::fromArray([$eventEnvelopes]);
}
return self::fromArray([...$this->eventEnvelopes, ...$eventEnvelopes]);
}

public function count(): int
{
return count($this->eventEnvelopes);
}

/**
* @return EventEnvelope[]
*/
public function jsonSerialize(): array
{
return $this->eventEnvelopes;
}
}
66 changes: 61 additions & 5 deletions src/Types/StreamQuery/Criteria/EventTypesAndTagsCriterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,79 @@

namespace Wwwision\DCBEventStore\Types\StreamQuery\Criteria;

use InvalidArgumentException;
use Wwwision\DCBEventStore\Types\EventType;
use Wwwision\DCBEventStore\Types\EventTypes;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHash;
use Wwwision\DCBEventStore\Types\Tag;
use Wwwision\DCBEventStore\Types\Tags;

final class EventTypesAndTagsCriterion implements Criterion
{
private readonly CriterionHash $hash;

public function __construct(
public readonly EventTypes $eventTypes,
public readonly Tags $tags,
private function __construct(
public readonly EventTypes|null $eventTypes,
public readonly Tags|null $tags,
public readonly bool $onlyLastEvent,
) {
$this->hash = CriterionHash::fromParts(
substr(substr(self::class, 0, -9), strrpos(self::class, '\\') + 1),
implode(',', $this->eventTypes->toStringArray()),
implode(',', $this->tags->toSimpleArray()),
implode(',', $eventTypes?->toStringArray() ?? []),
implode(',', $tags?->toSimpleArray() ?? []),
$onlyLastEvent ? 'onlyLastEvent' : '',
);
}

/**
* @param EventTypes|array<string|EventType>|string|null $eventTypes
* @param Tags|array<string|Tag>|string|null $tags
*/
public static function create(
EventTypes|array|string|null $eventTypes = null,
Tags|array|string|null $tags = null,
bool|null $onlyLastEvent = null,
): self {
if (is_string($eventTypes)) {
$eventTypes = EventTypes::fromStrings($eventTypes);
} elseif (is_array($eventTypes)) {
$eventTypes = EventTypes::fromArray($eventTypes);
}
if (is_string($tags)) {
$tags = Tags::create(Tag::parse($tags));
} elseif (is_array($tags)) {
$tags = Tags::fromArray($tags);
}
if ($eventTypes === null && $tags === null) {
throw new InvalidArgumentException('one of eventTypes or tags must not be null!', 1716131425);
}
return new self($eventTypes, $tags, $onlyLastEvent ?? false);
}

/**
* @param EventTypes|array<string|EventType>|string|null $eventTypes
* @param Tags|array<string|Tag>|string|null $tags
*/
public function with(
EventTypes|array|string|null $eventTypes = null,
Tags|array|string|null $tags = null,
bool|null $onlyLastEvent = null,
): self {
if (is_string($eventTypes)) {
$eventTypes = EventTypes::fromStrings($eventTypes);
} elseif (is_array($eventTypes)) {
$eventTypes = EventTypes::fromArray($eventTypes);
}
if (is_string($tags)) {
$tags = Tags::create(Tag::parse($tags));
} elseif (is_array($tags)) {
$tags = Tags::fromArray($tags);
}
return new self(
$eventTypes ?? $this->eventTypes,
$tags ?? $this->tags,
$onlyLastEvent ?? $this->onlyLastEvent,
);
}

Expand Down
28 changes: 0 additions & 28 deletions src/Types/StreamQuery/Criteria/EventTypesCriterion.php

This file was deleted.

28 changes: 0 additions & 28 deletions src/Types/StreamQuery/Criteria/TagsCriterion.php

This file was deleted.

Loading

0 comments on commit 1615cbc

Please sign in to comment.