-
Notifications
You must be signed in to change notification settings - Fork 26
/
DoctrineJsonGameRepository.php
126 lines (103 loc) · 4.04 KB
/
DoctrineJsonGameRepository.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
<?php
declare(strict_types=1);
namespace Gaming\ConnectFour\Port\Adapter\Persistence\Repository;
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Types;
use Gaming\Common\Domain\Exception\ConcurrencyException;
use Gaming\Common\EventStore\DomainEvents;
use Gaming\Common\EventStore\EventStore;
use Gaming\Common\Normalizer\Normalizer;
use Gaming\Common\Sharding\Shards;
use Gaming\ConnectFour\Application\Game\Query\Model\Game\Game as GameQueryModel;
use Gaming\ConnectFour\Application\Game\Query\Model\Game\GameFinder;
use Gaming\ConnectFour\Domain\Game\Exception\GameNotFoundException;
use Gaming\ConnectFour\Domain\Game\Game;
use Gaming\ConnectFour\Domain\Game\GameId;
use Gaming\ConnectFour\Domain\Game\Games;
final class DoctrineJsonGameRepository implements Games, GameFinder
{
/**
* @param Shards<string> $shards
*/
public function __construct(
private readonly Connection $connection,
private readonly string $tableName,
private readonly EventStore $eventStore,
private readonly Normalizer $normalizer,
private readonly Shards $shards
) {
}
public function nextIdentity(): GameId
{
return GameId::generate();
}
public function add(Game $game): void
{
$this->switchShard($game->id());
$this->connection->transactional(function () use ($game) {
$id = $game->id()->toString();
$domainEvents = (new DomainEvents($id))->append(...$game->flushDomainEvents());
$this->connection->insert(
$this->tableName,
['id' => $id, 'aggregate' => $this->normalizeGame($game), 'version' => $domainEvents->streamVersion()],
['id' => 'uuid', 'aggregate' => Types::JSON, 'version' => Types::INTEGER]
);
$this->eventStore->append(...$domainEvents->flush());
});
}
public function update(GameId $gameId, Closure $operation): void
{
$this->switchShard($gameId);
$this->connection->transactional(function () use ($gameId, $operation) {
$id = $gameId->toString();
$row = $this->connection->fetchAssociative(
'SELECT * FROM ' . $this->tableName . ' g WHERE g.id = ?',
[$id],
['uuid']
) ?: throw new GameNotFoundException();
$game = $this->denormalizeGame($row['aggregate']);
$operation($game);
$domainEvents = (new DomainEvents($id, $row['version']))->append(...$game->flushDomainEvents());
if ($row['version'] === $domainEvents->streamVersion()) {
return;
}
$this->connection->update(
$this->tableName,
['aggregate' => $this->normalizeGame($game), 'version' => $domainEvents->streamVersion()],
['id' => $id, 'version' => $row['version']],
['id' => 'uuid', 'aggregate' => Types::JSON, 'version' => Types::INTEGER]
) ?: throw new ConcurrencyException();
$this->eventStore->append(...$domainEvents->flush());
});
}
public function find(GameId $gameId): GameQueryModel
{
$this->switchShard($gameId);
$domainEvents = $this->eventStore->byStreamId(
$gameId->toString()
) ?: throw new GameNotFoundException();
$game = new GameQueryModel();
foreach ($domainEvents as $domainEvent) {
$game->apply($domainEvent->content);
}
return $game;
}
private function switchShard(GameId $gameId): void
{
$this->connection->executeStatement(
'USE ' . $this->connection->quoteIdentifier($this->shards->lookup($gameId->toString()))
);
}
private function normalizeGame(Game $game): mixed
{
return $this->normalizer->normalize($game, Game::class);
}
private function denormalizeGame(mixed $game): Game
{
return $this->normalizer->denormalize(
json_decode($game, true, 512, JSON_THROW_ON_ERROR),
Game::class
);
}
}