Skip to content

Commit

Permalink
projection options
Browse files Browse the repository at this point in the history
- I was not able to pass projection options when defining projections so it couldn't be used when running projection with console command
- projection options can be defined either in central way or as tagged service
- supplementing the documentation with projection management
  • Loading branch information
unixslayer committed Jul 31, 2020
1 parent 70b6e6d commit f70233e
Show file tree
Hide file tree
Showing 20 changed files with 454 additions and 11 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@
"symfony/dependency-injection": "^3.4 || ^4.4 || ^5.0",
"symfony/http-kernel": "^3.4 || ^4.4 || ^5.0",
"symfony/framework-bundle": "^3.4 || ^4.4 || ^5.0",
"symfony/messenger": "^4.2 || ^4.4 || ^5.0",
"prooph/event-store": "^7.0"
},
"require-dev": {
"prooph/event-sourcing": "^5.0",
"prooph/snapshot-store": "^1.0",
"prooph/pdo-event-store": "^1.0",
"prooph/pdo-event-store": "^1.12",
"phpunit/phpunit": "^7 || ^8",
"symfony/yaml" : "^3.4 || ^4.4 || ^5.0",
"bookdown/bookdown": "1.x-dev as 1.0.0",
Expand Down
137 changes: 136 additions & 1 deletion doc/projection_manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,139 @@ As with the tag the `read_model` is necessary only if the projection implements

Since both ways will produce the same result, it is up to you which of them you choose.

## Running projections
## Projection options

If you run projection you would want to pass options to it. This can be done in two ways.

### Central

Scalar options can be added to projection defined directly at the `projection_manager`:

```yaml
# app/config/config.yml or (flex) config/packages/prooph_event_store.yaml
prooph_event_store:
projection_managers:
acme_projection_manager:
event_store: 'prooph_event_store.pdo_mysql_event_store'
connection: 'pdo.connection'
projections:
todo_projection:
read_model: 'proophessor.projection.read_model.todo'
projection: 'proophessor.projection.todo'
options:
cache_size: 1000
sleep: 100000
persist_block_size: 1000
lock_timeout_ms: 1000
trigger_pcntl_dispatch: false
update_lock_threshold: 0
```

### Tagged service

If you want more complex usage, you can define tagged service which implements `\Prooph\Bundle\EventStore\Projection\ProjectionOptions`.
It should be tagged as `prooph_event_store.projection_options` with `projection_name` attribute pointing to specific projection.

```php
<?php
declare(strict_types=1);
namespace Prooph\ProophessorDo\Projection\Options;
use Prooph\Bundle\EventStore\Projection\ProjectionOptions;
use Prooph\EventStore\Pdo\Projection\GapDetection;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
final class ToDoProjectionOptions implements ProjectionOptions
{
private ParameterBag $parameterBag;
public function __construct(ParameterBag $parameterBag)
{
$this->parameterBag = $parameterBag;
}
public function options(): array
{
return [
'cache_size' => $this->parameterBag->get('projection.cache_size'),
'sleep' => $this->parameterBag->get('projection.sleep'),
'persist_block_size' => $this->parameterBag->get('projection.persist_block_size'),
'lock_timeout_ms' => $this->parameterBag->get('projection.lock_timeout_ms'),
'trigger_pcntl_dispatch' => $this->parameterBag->get('projection.trigger_pcntl_dispatch'),
'update_lock_threshold' => $this->parameterBag->get('projection.update_lock_threshold'),
'gap_detection' => new GapDetection([0, 5, 5, 10, 15, 25, 40, 65, 105]),
];
}
}
```

```yaml
services:
Prooph\ProophessorDo\Projection\Options\ToDoProjectionOptions:
tags:
- { name: prooph_event_store.projection_options, projection_name: todo_projection }
```

## Manage projections

Running a projection
```
$ bin/console event-store:projection:run [options] [--] <projection-name>

Arguments:
projection-name The name of the Projection

Options:
-o, --run-once Loop the projection only once, then exit
```
Stopping a projection
```
$ bin/console event-store:projection:stop <projection-name>

Arguments:
projection-name The name of the Projection
```
Resetting a projection
```
$ bin/console event-store:projection:reset <projection-name>

Arguments:
projection-name The name of the Projection
```
Showing the current projection state
```
$ bin/console event-store:projection:state <projection-name>

Arguments:
projection-name The name of the Projection
```
Deleting a projection
```
$ bin/console event-store:projection:delete [options] [--] <projection-name>

Arguments:
projection-name The name of the Projection

Options:
-w, --with-emitted-events Delete with emitted events
```
Showing a list of all projection names. Can be filtered.
```
$ bin/console event-store:projection:names [options] [--] [<filter>]

Arguments:
filter Filter by this string

Options:
-r, --regex Enable regex syntax for filter
-l, --limit=LIMIT Limit the result set [default: 20]
-o, --offset=OFFSET Offset for result set [default: 0]
-m, --manager=MANAGER Manager for result set
```
14 changes: 11 additions & 3 deletions src/Command/AbstractProjectionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ abstract class AbstractProjectionCommand extends Command
*/
protected $projectionReadModelLocator;

/**
* @var ContainerInterface
*/
protected $projectionOptionsLocator;

public function __construct(
ContainerInterface $projectionManagerForProjectionsLocator,
ContainerInterface $projectionsLocator,
ContainerInterface $projectionReadModelLocator
ContainerInterface $projectionReadModelLocator,
ContainerInterface $projectionOptionsLocator
) {
$this->projectionManagerForProjectionsLocator = $projectionManagerForProjectionsLocator;
$this->projectionsLocator = $projectionsLocator;
$this->projectionReadModelLocator = $projectionReadModelLocator;
$this->projectionOptionsLocator = $projectionOptionsLocator;

parent::__construct();
}
Expand All @@ -97,18 +104,19 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
throw new RuntimeException(\vsprintf('Projection "%s" not found', \is_array($this->projectionName) ? $this->projectionName : [$this->projectionName]));
}
$this->projection = $this->projectionsLocator->get($this->projectionName);
$projectionOptions = $this->projectionOptionsLocator->has($this->projectionName) ? $this->projectionOptionsLocator->get($this->projectionName)->options() : [];

if ($this->projection instanceof ReadModelProjection) {
if (! $this->projectionReadModelLocator->has($this->projectionName)) {
throw new RuntimeException(\vsprintf('ReadModel for "%s" not found', \is_array($this->projectionName) ? $this->projectionName : [$this->projectionName]));
}
$this->readModel = $this->projectionReadModelLocator->get($this->projectionName);

$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel);
$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel, $projectionOptions);
}

if ($this->projection instanceof Projection) {
$this->projector = $this->projectionManager->createProjection($this->projectionName);
$this->projector = $this->projectionManager->createProjection($this->projectionName, $projectionOptions);
}

if (null === $this->projector) {
Expand Down
69 changes: 69 additions & 0 deletions src/DependencyInjection/Compiler/ProjectionOptionsPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

declare(strict_types=1);

namespace Prooph\Bundle\EventStore\DependencyInjection\Compiler;

use Prooph\Bundle\EventStore\DependencyInjection\ProophEventStoreExtension;
use Prooph\Bundle\EventStore\Exception\RuntimeException;
use Prooph\Bundle\EventStore\Projection\ProjectionOptions;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

final class ProjectionOptionsPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (! $container->hasDefinition('prooph_event_store.projection_options_locator')) {
return;
}

$projectionOptionsIds = \array_keys($container->findTaggedServiceIds(ProophEventStoreExtension::TAG_PROJECTION_OPTIONS));
$projectionOptionsLocator = [];

foreach ($projectionOptionsIds as $id) {
$definition = $container->getDefinition($id);

self::assertDefinitionIsAValidClass($id, $definition);

$tags = $definition->getTag(ProophEventStoreExtension::TAG_PROJECTION_OPTIONS);
foreach ($tags as $tag) {
if (! isset($tag['projection_name'])) {
throw new RuntimeException(\sprintf(
'"projection_name" attribute is missing from tag "%s" on service "%s"',
ProophEventStoreExtension::TAG_PROJECTION_OPTIONS,
$id
));
}

$projectionOptionsLocator[$tag['projection_name']] = new Reference($id);
}
}

$locator = $container->getDefinition('prooph_event_store.projection_options_locator');
$locator->replaceArgument(0, \array_merge($locator->getArgument(0), $projectionOptionsLocator));
}

/**
* @param string $serviceId The id of the service that is verified
* @param Definition $definition The Definition of the service that is verified
*
* @throws RuntimeException if the service does not implement ProjectionOptions.
*/
private static function assertDefinitionIsAValidClass(string $serviceId, Definition $definition): void
{
/** @var object $definitionClass */
$definitionClass = $definition->getClass();
$reflection = new \ReflectionClass($definitionClass);

if (! $reflection->implementsInterface(ProjectionOptions::class)) {
throw new RuntimeException(\sprintf(
'Tagged service "%s" must implement "%s"',
$serviceId,
ProjectionOptions::class
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static function addServicesToLocator(
array $serviceMap
): void {
$definition = $container->getDefinition($locatorId);
$definition->replaceArgument(0, \array_merge($serviceMap, $definition->getArgument(0)));
$definition->replaceArgument(0, \array_merge($definition->getArgument(0), $serviceMap));
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public function addProjectionManagerSection(ArrayNodeDefinition $node): void
->then($removeFirstCharacter)
->end()
->end()
->arrayNode('options')
->canBeUnset()
->addDefaultsIfNotSet()
->treatFalseLike([])
->treatNullLike([])
->ignoreExtraKeys(false)
->end()
->end();

$node
Expand Down
28 changes: 24 additions & 4 deletions src/DependencyInjection/ProophEventStoreExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
final class ProophEventStoreExtension extends Extension
{
public const TAG_PROJECTION = 'prooph_event_store.projection';
public const TAG_PROJECTION_OPTIONS = 'prooph_event_store.projection_options';

public function getNamespace(): string
{
Expand Down Expand Up @@ -60,17 +61,20 @@ private static function loadProjectionManagers(array $config, ContainerBuilder $
$projectionManagerForProjectionsLocator = [];
$projectionsLocator = [];
$readModelsLocator = [];
$projectionOptionsLocator = [];

foreach ($config['projection_managers'] as $projectionManagerName => $projectionManagerConfig) {
$projectionManagerId = "prooph_event_store.projection_manager.$projectionManagerName";
self::defineProjectionManager($container, $projectionManagerId, $projectionManagerConfig);

[$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator] = self::collectProjectionsForLocators(
[$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator, $projectionOptionsLocator] = self::collectProjectionsForLocators(
$container,
$projectionManagerConfig['projections'],
$projectionManagerId,
$projectionManagerForProjectionsLocator,
$projectionsLocator,
$readModelsLocator
$readModelsLocator,
$projectionOptionsLocator
);

$projectionManagers[$projectionManagerName] = "prooph_event_store.$projectionManagerName";
Expand All @@ -83,6 +87,7 @@ private static function loadProjectionManagers(array $config, ContainerBuilder $
self::defineServiceLocator($container, 'prooph_event_store.projection_manager_for_projections_locator', $projectionManagerForProjectionsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projection_read_models_locator', $readModelsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projections_locator', $projectionsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projection_options_locator', $projectionOptionsLocator);
}

private static function defineProjectionManager(ContainerBuilder $container, string $serviceId, array $config): void
Expand All @@ -108,11 +113,13 @@ private static function defineServiceLocator(ContainerBuilder $container, string
}

private static function collectProjectionsForLocators(
ContainerBuilder $container,
array $projections,
string $projectionManagerId,
array $projectionManagerForProjectionsLocator,
array $projectionsLocator,
array $readModelsLocator
array $readModelsLocator,
array $projectionOptionsLocator
): array {
foreach ($projections as $projectionName => $projectionConfig) {
if (isset($projectionConfig['read_model'])) {
Expand All @@ -121,9 +128,22 @@ private static function collectProjectionsForLocators(

$projectionsLocator[$projectionName] = new Reference($projectionConfig['projection']);
$projectionManagerForProjectionsLocator[$projectionName] = new Reference($projectionManagerId);

$projectionOptionsId = \sprintf('prooph_event_store.projection_options.%s', $projectionName);
self::defineProjectionOptions($container, $projectionOptionsId, $projectionConfig['options']);
$projectionOptionsLocator[$projectionName] = new Reference($projectionOptionsId);
}

return [$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator];
return [$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator, $projectionOptionsLocator];
}

private static function defineProjectionOptions(ContainerBuilder $container, string $serviceId, array $projectionOptions): void
{
$definition = new ChildDefinition('prooph_event_store.projection_options');
$definition->setFactory([new Reference('prooph_event_store.projection_options_factory'), 'createProjectionOptions']);
$definition->setArguments([$projectionOptions]);

$container->setDefinition($serviceId, $definition);
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/Projection/Options/ProjectionOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Prooph\Bundle\EventStore\Projection\Options;

use Prooph\Bundle\EventStore\Projection\ProjectionOptions as ProjectionOptionsInterface;

final class ProjectionOptions implements ProjectionOptionsInterface
{
/**
* @var array
*/
private $options;

public function __construct(array $options)
{
$this->options = $options;
}

public function options(): array
{
return $this->options;
}
}
Loading

0 comments on commit f70233e

Please sign in to comment.