Skip to content

Commit

Permalink
Add config route option async_switch
Browse files Browse the repository at this point in the history
  • Loading branch information
sandrokeil committed Jun 21, 2017
1 parent f97bfe9 commit 0871828
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 8 deletions.
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
"phpunit/phpunit": "^5.3",
"phpunit/php-invoker": "^1.1.4",
"fabpot/php-cs-fixer": "^1.11",
"bookdown/bookdown": "1.x-dev as 1.0.0",
"tobiju/bookdown-bootswatch-templates": "1.0.x-dev"
"bookdown/bookdown": "^1.0",
"tobiju/bookdown-bootswatch-templates": "1.1.1",
"react/promise": "^2.5"
},
"suggest": {
},
Expand Down
6 changes: 6 additions & 0 deletions doc/configuration_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
# Service ID of the async message producer e.g. for Amazon AWS SQS
async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.command_bus_router'
# Routing definition constructed as
Expand All @@ -28,6 +30,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
# Service ID of the async message producer e.g. for Amazon AWS SQS
async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.event_bus_router'
# Routing definition constructed as
Expand All @@ -46,6 +50,8 @@ prooph_service_bus:
# Service ID of the message factory
message_factory: 'prooph_service_bus.message_factory'
router:
# Service ID of the async message producer e.g. for Amazon AWS SQS
async_switch: 'my_async_message_producer'
# Service ID of the router
type: 'prooph_service_bus.query_bus_router'
# Routing definition constructed as
Expand Down
10 changes: 10 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ private function addServiceBusSection(string $type, ArrayNodeDefinition $node)
->end()
->defaultValue('prooph_service_bus.' . $type . '_bus_router')
->end()
->scalarNode('async_switch')
->beforeNormalization()
->ifTrue(function ($v) {
return strpos($v, '@') === 0;
})
->then(function ($v) {
return substr($v, 1);
})
->end()
->end()
->append($routesNode)
->end()
->end()
Expand Down
33 changes: 28 additions & 5 deletions src/DependencyInjection/ProophServiceBusExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,34 @@ private function loadBus(string $type, string $name, array $options, ContainerBu
if (!empty($options['router'])) {
$routerId = sprintf('prooph_service_bus.%s.router', $name);

$routerDefinition = $container->setDefinition(
$routerId,
new DefinitionDecorator($options['router']['type'])
);
$routerDefinition->setArguments([$options['router']['routes'] ?? []]);
if (isset($options['router']['async_switch'])) {
$decoratedRouterId = sprintf('prooph_service_bus.%s.decorated_router', $name);

$routerDefinition = $container->setDefinition(
$decoratedRouterId,
new DefinitionDecorator($options['router']['type'])
);
$routerDefinition->setArguments([$options['router']['routes'] ?? []]);

$container->setDefinition($decoratedRouterId, $routerDefinition);


// replace router definition with async switch message router
$routerDefinition = new DefinitionDecorator('prooph_service_bus.async_switch_message_router');
$routerDefinition->setArguments([
new Reference($decoratedRouterId),
new Reference($options['router']['async_switch']),
]);
$routerDefinition->setPublic(true);
$container->setDefinition($routerId, $routerDefinition);
} else {
$routerDefinition = $container->setDefinition(
$routerId,
new DefinitionDecorator($options['router']['type'])
);
$routerDefinition->setArguments([$options['router']['routes'] ?? []]);
}


$serviceBusDefinition->addMethodCall('utilize', [new Reference($routerId)]);
}
Expand Down
31 changes: 31 additions & 0 deletions src/MessageBusFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
/**
* prooph (http://getprooph.org/)
*
* @see https://github.com/prooph/service-bus-symfony-bundle for the canonical source repository
* @copyright Copyright (c) 2017 prooph software GmbH (http://prooph-software.com/)
* @license https://github.com/prooph/service-bus-symfony-bundle/blob/master/LICENSE.md New BSD License
*/

declare(strict_types=1);

namespace Prooph\Bundle\ServiceBus;

use Prooph\ServiceBus\MessageBus;
use Symfony\Component\DependencyInjection\ContainerInterface;

class MessageBusFactory
{
public function create(string $class, ContainerInterface $container, array $plugins = []): MessageBus
{
/** @var MessageBus $bus */
$bus = new $class();

foreach ($plugins as $pluginId) {
$plugin = $container->get($pluginId);
$plugin->attachToMessageBus($bus);
}

return $bus;
}
}
1 change: 1 addition & 0 deletions src/Resources/config/service_bus.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
<service id="prooph_service_bus.container_wrapper" class="%prooph_service_bus.container_wrapper.class%" public="false">
<argument type="service" id="service_container" />
</service>
<service id="prooph_service_bus.async_switch_message_router" class="Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter" />
</services>
</container>
22 changes: 21 additions & 1 deletion test/BundleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
namespace ProophTest\Bundle\ServiceBus;

use PHPUnit_Framework_TestCase as TestCase;
use Prooph\Bundle\ServiceBus\DependencyInjection\Compiler\PluginsPass;
use Prooph\Bundle\ServiceBus\DependencyInjection\Compiler\RoutePass;
use Prooph\Bundle\ServiceBus\ProophServiceBusBundle;
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
use Symfony\Component\DependencyInjection\ContainerBuilder;

class BundleTest extends TestCase
Expand All @@ -29,6 +32,23 @@ public function it_builds_compiler_pass()
$config = $container->getCompilerPassConfig();
$passes = $config->getBeforeOptimizationPasses();

// TODO assert something
$this->assertInstanceOf(PassConfig::class, $config);

$hasPluginPass = false;
$hasRoutePass = false;

foreach ($passes as $pass) {
if ($pass instanceof PluginsPass) {
$hasPluginPass = true;
continue;
}
if ($pass instanceof RoutePass) {
$hasRoutePass = true;
continue;
}
}

$this->assertTrue($hasPluginPass, 'No plugin pass configured');
$this->assertTrue($hasRoutePass, 'No route pass configured');
}
}
22 changes: 22 additions & 0 deletions test/DependencyInjection/AbstractServiceBusExtensionTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Exception\CommandDispatchException;
use Prooph\ServiceBus\Exception\MessageDispatchException;
use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\ServiceBus\Plugin\Router\QueryRouter;
Expand Down Expand Up @@ -384,6 +385,27 @@ public function it_adds_event_bus_routes_based_on_tags()
self::assertSame($event, $mockListener->lastEvent());
}

/**
* @test
*/
public function it_creates_a_command_bus_with_async_switch_message_router()
{
$container = $this->loadContainer('command_bus_async');

$config = $container->getDefinition('prooph_service_bus.command_bus_async');

self::assertEquals(CommandBus::class, $config->getClass());

/* @var $commandBus CommandBus */
$commandBus = $container->get('prooph_service_bus.command_bus_async');

self::assertInstanceOf(CommandBus::class, $commandBus);

$router = $container->get('prooph_service_bus.command_bus_async.router');

self::assertInstanceOf(AsyncSwitchMessageRouter::class, $router);
}

private function loadContainer($fixture, CompilerPassInterface $compilerPass = null)
{
$container = $this->getContainer();
Expand Down
16 changes: 16 additions & 0 deletions test/DependencyInjection/Fixture/Model/AsyncMessageProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model;

use Prooph\Common\Messaging\Message;
use Prooph\ServiceBus\Async\MessageProducer;
use React\Promise\Deferred;

class AsyncMessageProducer implements MessageProducer
{
public function __invoke(Message $message, Deferred $deferred = null)
{
}
}
22 changes: 22 additions & 0 deletions test/DependencyInjection/Fixture/config/xml/command_bus_async.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" ?>

<srv:container xmlns="http://getprooph.org/schemas/symfony-dic/prooph"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:srv="http://symfony.com/schema/dic/services"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://getprooph.org/schemas/symfony-dic/prooph http://getprooph.org/schemas/symfony-dic/prooph/service_bus-5.1.xsd">

<config>
<command_bus name="command_bus_async" message_factory="prooph_service_bus.message_factory">
<plugin>prooph_service_bus.handle_command_invoke_strategy</plugin>
<router type="prooph_service_bus.command_bus_router" async_switch="async_message_producer">
<route command="Acme\RegisterUser">Acme\RegisterUserHandler</route>
</router>
</command_bus>
</config>

<srv:services>
<srv:service id="Acme\RegisterUserHandler" class="ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AcmeRegisterUserHandler" />
<srv:service id="async_message_producer" class="ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AsyncMessageProducer" />
</srv:services>
</srv:container>
13 changes: 13 additions & 0 deletions test/DependencyInjection/Fixture/config/yml/command_bus_async.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
prooph_service_bus:
command_buses:
command_bus_async:
router:
async_switch: 'async_message_producer'
routes:
'Acme\RegisterUser': 'Acme\RegisterUserHandler'

services:
'Acme\RegisterUserHandler':
class: ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AcmeRegisterUserHandler
'async_message_producer':
class: ProophTest\Bundle\ServiceBus\DependencyInjection\Fixture\Model\AsyncMessageProducer

0 comments on commit 0871828

Please sign in to comment.