Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

projection options #69

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@
"symfony/dependency-injection": "^3.4 || ^4.4 || ^5.0",
"symfony/http-kernel": "^3.4 || ^4.4 || ^5.0",
"symfony/framework-bundle": "^3.4 || ^4.4 || ^5.0",
"symfony/messenger": "^4.2 || ^4.4 || ^5.0",
"prooph/event-store": "^7.0"
},
"require-dev": {
"prooph/event-sourcing": "^5.0",
"prooph/snapshot-store": "^1.0",
"prooph/pdo-event-store": "^1.0",
"prooph/pdo-event-store": "^1.12",
"phpunit/phpunit": "^7 || ^8",
"symfony/yaml" : "^3.4 || ^4.4 || ^5.0",
"bookdown/bookdown": "1.x-dev as 1.0.0",
Expand Down
137 changes: 136 additions & 1 deletion doc/projection_manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,139 @@ As with the tag the `read_model` is necessary only if the projection implements

Since both ways will produce the same result, it is up to you which of them you choose.

## Running projections
## Projection options

If you run projection you would want to pass options to it. This can be done in two ways.

### Central

Scalar options can be added to projection defined directly at the `projection_manager`:

```yaml
# app/config/config.yml or (flex) config/packages/prooph_event_store.yaml
prooph_event_store:
projection_managers:
acme_projection_manager:
event_store: 'prooph_event_store.pdo_mysql_event_store'
connection: 'pdo.connection'
projections:
todo_projection:
read_model: 'proophessor.projection.read_model.todo'
projection: 'proophessor.projection.todo'
options:
cache_size: 1000
sleep: 100000
persist_block_size: 1000
lock_timeout_ms: 1000
trigger_pcntl_dispatch: false
update_lock_threshold: 0
```

### Tagged service

If you want more complex usage, you can define tagged service which implements `\Prooph\Bundle\EventStore\Projection\ProjectionOptions`.
It should be tagged as `prooph_event_store.projection_options` with `projection_name` attribute pointing to specific projection.

```php
<?php

declare(strict_types=1);

namespace Prooph\ProophessorDo\Projection\Options;

use Prooph\Bundle\EventStore\Projection\ProjectionOptions;
use Prooph\EventStore\Pdo\Projection\GapDetection;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;

final class ToDoProjectionOptions implements ProjectionOptions
{
private ParameterBag $parameterBag;

public function __construct(ParameterBag $parameterBag)
{
$this->parameterBag = $parameterBag;
}

public function options(): array
{
return [
'cache_size' => $this->parameterBag->get('projection.cache_size'),
'sleep' => $this->parameterBag->get('projection.sleep'),
'persist_block_size' => $this->parameterBag->get('projection.persist_block_size'),
'lock_timeout_ms' => $this->parameterBag->get('projection.lock_timeout_ms'),
'trigger_pcntl_dispatch' => $this->parameterBag->get('projection.trigger_pcntl_dispatch'),
'update_lock_threshold' => $this->parameterBag->get('projection.update_lock_threshold'),
'gap_detection' => new GapDetection([0, 5, 5, 10, 15, 25, 40, 65, 105]),
];
}
}
```

```yaml
services:
Prooph\ProophessorDo\Projection\Options\ToDoProjectionOptions:
tags:
- { name: prooph_event_store.projection_options, projection_name: todo_projection }
```

## Manage projections

Running a projection
```
$ bin/console event-store:projection:run [options] [--] <projection-name>

Arguments:
projection-name The name of the Projection

Options:
-o, --run-once Loop the projection only once, then exit
```

Stopping a projection
```
$ bin/console event-store:projection:stop <projection-name>

Arguments:
projection-name The name of the Projection
```

Resetting a projection
```
$ bin/console event-store:projection:reset <projection-name>

Arguments:
projection-name The name of the Projection
```

Showing the current projection state
```
$ bin/console event-store:projection:state <projection-name>

Arguments:
projection-name The name of the Projection
```

Deleting a projection
```
$ bin/console event-store:projection:delete [options] [--] <projection-name>

Arguments:
projection-name The name of the Projection

Options:
-w, --with-emitted-events Delete with emitted events
```

Showing a list of all projection names. Can be filtered.
```
$ bin/console event-store:projection:names [options] [--] [<filter>]

Arguments:
filter Filter by this string

Options:
-r, --regex Enable regex syntax for filter
-l, --limit=LIMIT Limit the result set [default: 20]
-o, --offset=OFFSET Offset for result set [default: 0]
-m, --manager=MANAGER Manager for result set
```
14 changes: 11 additions & 3 deletions src/Command/AbstractProjectionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ abstract class AbstractProjectionCommand extends Command
*/
protected $projectionReadModelLocator;

/**
* @var ContainerInterface
*/
protected $projectionOptionsLocator;

public function __construct(
ContainerInterface $projectionManagerForProjectionsLocator,
ContainerInterface $projectionsLocator,
ContainerInterface $projectionReadModelLocator
ContainerInterface $projectionReadModelLocator,
ContainerInterface $projectionOptionsLocator
) {
$this->projectionManagerForProjectionsLocator = $projectionManagerForProjectionsLocator;
$this->projectionsLocator = $projectionsLocator;
$this->projectionReadModelLocator = $projectionReadModelLocator;
$this->projectionOptionsLocator = $projectionOptionsLocator;

parent::__construct();
}
Expand All @@ -97,18 +104,19 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
throw new RuntimeException(\vsprintf('Projection "%s" not found', \is_array($this->projectionName) ? $this->projectionName : [$this->projectionName]));
}
$this->projection = $this->projectionsLocator->get($this->projectionName);
$projectionOptions = $this->projectionOptionsLocator->has($this->projectionName) ? $this->projectionOptionsLocator->get($this->projectionName)->options() : [];

if ($this->projection instanceof ReadModelProjection) {
if (! $this->projectionReadModelLocator->has($this->projectionName)) {
throw new RuntimeException(\vsprintf('ReadModel for "%s" not found', \is_array($this->projectionName) ? $this->projectionName : [$this->projectionName]));
}
$this->readModel = $this->projectionReadModelLocator->get($this->projectionName);

$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel);
$this->projector = $this->projectionManager->createReadModelProjection($this->projectionName, $this->readModel, $projectionOptions);
}

if ($this->projection instanceof Projection) {
$this->projector = $this->projectionManager->createProjection($this->projectionName);
$this->projector = $this->projectionManager->createProjection($this->projectionName, $projectionOptions);
}

if (null === $this->projector) {
Expand Down
69 changes: 69 additions & 0 deletions src/DependencyInjection/Compiler/ProjectionOptionsPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

declare(strict_types=1);

namespace Prooph\Bundle\EventStore\DependencyInjection\Compiler;

use Prooph\Bundle\EventStore\DependencyInjection\ProophEventStoreExtension;
use Prooph\Bundle\EventStore\Exception\RuntimeException;
use Prooph\Bundle\EventStore\Projection\ProjectionOptions;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

final class ProjectionOptionsPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (! $container->hasDefinition('prooph_event_store.projection_options_locator')) {
return;
}

$projectionOptionsIds = \array_keys($container->findTaggedServiceIds(ProophEventStoreExtension::TAG_PROJECTION_OPTIONS));
$projectionOptionsLocator = [];

foreach ($projectionOptionsIds as $id) {
$definition = $container->getDefinition($id);

self::assertDefinitionIsAValidClass($id, $definition);

$tags = $definition->getTag(ProophEventStoreExtension::TAG_PROJECTION_OPTIONS);
foreach ($tags as $tag) {
if (! isset($tag['projection_name'])) {
throw new RuntimeException(\sprintf(
'"projection_name" attribute is missing from tag "%s" on service "%s"',
ProophEventStoreExtension::TAG_PROJECTION_OPTIONS,
$id
));
}

$projectionOptionsLocator[$tag['projection_name']] = new Reference($id);
}
}

$locator = $container->getDefinition('prooph_event_store.projection_options_locator');
$locator->replaceArgument(0, \array_merge($locator->getArgument(0), $projectionOptionsLocator));
}

/**
* @param string $serviceId The id of the service that is verified
* @param Definition $definition The Definition of the service that is verified
*
* @throws RuntimeException if the service does not implement ProjectionOptions.
*/
private static function assertDefinitionIsAValidClass(string $serviceId, Definition $definition): void
{
/** @var object $definitionClass */
$definitionClass = $definition->getClass();
$reflection = new \ReflectionClass($definitionClass);

if (! $reflection->implementsInterface(ProjectionOptions::class)) {
throw new RuntimeException(\sprintf(
'Tagged service "%s" must implement "%s"',
$serviceId,
ProjectionOptions::class
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static function addServicesToLocator(
array $serviceMap
): void {
$definition = $container->getDefinition($locatorId);
$definition->replaceArgument(0, \array_merge($serviceMap, $definition->getArgument(0)));
$definition->replaceArgument(0, \array_merge($definition->getArgument(0), $serviceMap));
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public function addProjectionManagerSection(ArrayNodeDefinition $node): void
->then($removeFirstCharacter)
->end()
->end()
->arrayNode('options')
->canBeUnset()
->addDefaultsIfNotSet()
->treatFalseLike([])
->treatNullLike([])
->ignoreExtraKeys(false)
->end()
->end();

$node
Expand Down
28 changes: 24 additions & 4 deletions src/DependencyInjection/ProophEventStoreExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
final class ProophEventStoreExtension extends Extension
{
public const TAG_PROJECTION = 'prooph_event_store.projection';
public const TAG_PROJECTION_OPTIONS = 'prooph_event_store.projection_options';

public function getNamespace(): string
{
Expand Down Expand Up @@ -60,17 +61,20 @@ private static function loadProjectionManagers(array $config, ContainerBuilder $
$projectionManagerForProjectionsLocator = [];
$projectionsLocator = [];
$readModelsLocator = [];
$projectionOptionsLocator = [];

foreach ($config['projection_managers'] as $projectionManagerName => $projectionManagerConfig) {
$projectionManagerId = "prooph_event_store.projection_manager.$projectionManagerName";
self::defineProjectionManager($container, $projectionManagerId, $projectionManagerConfig);

[$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator] = self::collectProjectionsForLocators(
[$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator, $projectionOptionsLocator] = self::collectProjectionsForLocators(
$container,
$projectionManagerConfig['projections'],
$projectionManagerId,
$projectionManagerForProjectionsLocator,
$projectionsLocator,
$readModelsLocator
$readModelsLocator,
$projectionOptionsLocator
);

$projectionManagers[$projectionManagerName] = "prooph_event_store.$projectionManagerName";
Expand All @@ -83,6 +87,7 @@ private static function loadProjectionManagers(array $config, ContainerBuilder $
self::defineServiceLocator($container, 'prooph_event_store.projection_manager_for_projections_locator', $projectionManagerForProjectionsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projection_read_models_locator', $readModelsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projections_locator', $projectionsLocator);
self::defineServiceLocator($container, 'prooph_event_store.projection_options_locator', $projectionOptionsLocator);
}

private static function defineProjectionManager(ContainerBuilder $container, string $serviceId, array $config): void
Expand All @@ -108,11 +113,13 @@ private static function defineServiceLocator(ContainerBuilder $container, string
}

private static function collectProjectionsForLocators(
ContainerBuilder $container,
array $projections,
string $projectionManagerId,
array $projectionManagerForProjectionsLocator,
array $projectionsLocator,
array $readModelsLocator
array $readModelsLocator,
array $projectionOptionsLocator
): array {
foreach ($projections as $projectionName => $projectionConfig) {
if (isset($projectionConfig['read_model'])) {
Expand All @@ -121,9 +128,22 @@ private static function collectProjectionsForLocators(

$projectionsLocator[$projectionName] = new Reference($projectionConfig['projection']);
$projectionManagerForProjectionsLocator[$projectionName] = new Reference($projectionManagerId);

$projectionOptionsId = \sprintf('prooph_event_store.projection_options.%s', $projectionName);
self::defineProjectionOptions($container, $projectionOptionsId, $projectionConfig['options']);
$projectionOptionsLocator[$projectionName] = new Reference($projectionOptionsId);
}

return [$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator];
return [$projectionManagerForProjectionsLocator, $projectionsLocator, $readModelsLocator, $projectionOptionsLocator];
}

private static function defineProjectionOptions(ContainerBuilder $container, string $serviceId, array $projectionOptions): void
{
$definition = new ChildDefinition('prooph_event_store.projection_options');
$definition->setFactory([new Reference('prooph_event_store.projection_options_factory'), 'createProjectionOptions']);
$definition->setArguments([$projectionOptions]);

$container->setDefinition($serviceId, $definition);
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/Projection/Options/ProjectionOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Prooph\Bundle\EventStore\Projection\Options;

use Prooph\Bundle\EventStore\Projection\ProjectionOptions as ProjectionOptionsInterface;

final class ProjectionOptions implements ProjectionOptionsInterface
{
/**
* @var array
*/
private $options;

public function __construct(array $options)
{
$this->options = $options;
}

public function options(): array
{
return $this->options;
}
}
Loading