Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch EVENT_APPEND_TO and EVENT_CREATE in transactional event store when not inside transaction #38

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/EventPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\AbstractPlugin;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
use Prooph\ServiceBus\EventBus;
Expand Down Expand Up @@ -42,7 +43,7 @@ public function attachToEventStore(ActionEventEmitterEventStore $eventStore): vo
function (ActionEvent $event) use ($eventStore): void {
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator());

if (! $eventStore instanceof TransactionalActionEventEmitterEventStore) {
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamNotFound', false)
|| $event->getParam('concurrencyException', false)
) {
Expand All @@ -64,7 +65,7 @@ function (ActionEvent $event) use ($eventStore): void {
$stream = $event->getParam('stream');
$recordedEvents = $stream->streamEvents();

if (! $eventStore instanceof TransactionalActionEventEmitterEventStore) {
if (! $this->inTransaction($eventStore)) {
if ($event->getParam('streamExistsAlready', false)) {
return;
}
Expand Down Expand Up @@ -99,4 +100,10 @@ function (ActionEvent $event): void {
);
}
}

private function inTransaction(EventStore $eventStore): bool
{
return $eventStore instanceof TransactionalActionEventEmitterEventStore
&& $eventStore->inTransaction();
}
}
55 changes: 39 additions & 16 deletions tests/EventPublisherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,34 @@ protected function setUp(): void
$this->eventStore = new TransactionalActionEventEmitterEventStore(new InMemoryEventStore(), new ProophActionEventEmitter());
}

/**
* @test
*/
public function it_publishes_all_created_and_appended_events_if_not_inside_transaction(): void
{
[$event1, $event2, $event3, $event4] = $this->setupStubEvents();

$eventBus = $this->prophesize(EventBus::class);

$eventPublisher = new EventPublisher($eventBus->reveal());

$eventPublisher->attachToEventStore($this->eventStore);

$eventBus->dispatch($event1)->shouldBeCalled();
$eventBus->dispatch($event2)->shouldBeCalled();
$eventBus->dispatch($event3)->shouldBeCalled();
$eventBus->dispatch($event4)->shouldBeCalled();

$this->eventStore->create(new Stream(new StreamName('test'), new \ArrayIterator([$event1, $event2])));
$this->eventStore->appendTo(new StreamName('test'), new \ArrayIterator([$event3, $event4]));
}

/**
* @test
*/
public function it_publishes_all_created_and_appended_events(): void
{
$event1 = $this->prophesize(Message::class)->reveal();
$event2 = $this->prophesize(Message::class)->reveal();
$event3 = $this->prophesize(Message::class)->reveal();
$event4 = $this->prophesize(Message::class)->reveal();
[$event1, $event2, $event3, $event4] = $this->setupStubEvents();

$eventBus = $this->prophesize(EventBus::class);

Expand All @@ -71,10 +90,7 @@ public function it_publishes_all_created_and_appended_events(): void
*/
public function it_publishes_correctly_when_event_store_implements_can_control_transaction(): void
{
$event1 = $this->prophesize(Message::class)->reveal();
$event2 = $this->prophesize(Message::class)->reveal();
$event3 = $this->prophesize(Message::class)->reveal();
$event4 = $this->prophesize(Message::class)->reveal();
[$event1, $event2, $event3, $event4] = $this->setupStubEvents();

$eventBus = $this->prophesize(EventBus::class);

Expand All @@ -98,10 +114,7 @@ public function it_publishes_correctly_when_event_store_implements_can_control_t
*/
public function it_does_not_publish_when_event_store_rolls_back(): void
{
$event1 = $this->prophesize(Message::class)->reveal();
$event2 = $this->prophesize(Message::class)->reveal();
$event3 = $this->prophesize(Message::class)->reveal();
$event4 = $this->prophesize(Message::class)->reveal();
[$event1, $event2, $event3, $event4] = $this->setupStubEvents();

$eventBus = $this->prophesize(EventBus::class);

Expand All @@ -125,10 +138,7 @@ public function it_does_not_publish_when_event_store_rolls_back(): void
*/
public function it_does_not_publish_when_non_transactional_event_store_throws_exception(): void
{
$event1 = $this->prophesize(Message::class)->reveal();
$event2 = $this->prophesize(Message::class)->reveal();
$event3 = $this->prophesize(Message::class)->reveal();
$event4 = $this->prophesize(Message::class)->reveal();
[$event1, $event2, $event3, $event4] = $this->setupStubEvents();

$eventStore = $this->prophesize(EventStore::class);
$eventStore->create(new Stream(new StreamName('test'), new \ArrayIterator([$event1, $event2])))->willThrow(StreamExistsAlready::with(new StreamName('test')))->shouldBeCalled();
Expand Down Expand Up @@ -166,4 +176,17 @@ public function it_does_not_publish_when_non_transactional_event_store_throws_ex
// ignore
}
}

/**
* @return Message[]
*/
private function setupStubEvents(): array
{
$event1 = $this->prophesize(Message::class)->reveal();
$event2 = $this->prophesize(Message::class)->reveal();
$event3 = $this->prophesize(Message::class)->reveal();
$event4 = $this->prophesize(Message::class)->reveal();

return [$event1, $event2, $event3, $event4];
}
}