Skip to content

Commit

Permalink
Updated pipeline contracts to v0.5
Browse files Browse the repository at this point in the history
gplanchat committed Nov 13, 2023
1 parent 3bc1ba7 commit dcdcd18
Showing 8 changed files with 215 additions and 96 deletions.
9 changes: 5 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
"php": "^8.2",
"psr/log": "^3.0",
"php-etl/bucket": "*",
"php-etl/pipeline-contracts": "0.4.*",
"php-etl/pipeline-contracts": "0.5.*",
"php-etl/bucket-contracts": "0.2.*"
},
"require-dev": {
@@ -31,7 +31,7 @@
"rector/rector": "^0.15"
},
"provide": {
"php-etl/pipeline-implementation": "0.3.0"
"php-etl/pipeline-implementation": "0.5.0"
},
"autoload": {
"psr-4": {
@@ -46,12 +46,13 @@
"config": {
"bin-dir": "bin",
"allow-plugins": {
"infection/extension-installer": true
"infection/extension-installer": true,
"php-http/discovery": true
}
},
"extra": {
"branch-alias": {
"dev-main": "0.5.x-dev"
"dev-main": "0.6.x-dev"
}
}
}
72 changes: 63 additions & 9 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/Extractor/IteratorExtractor.php
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

namespace Kiboko\Component\Pipeline\Extractor;

use Kiboko\Component\Bucket\AcceptanceResultBucket;
use Kiboko\Contract\Pipeline\ExtractorInterface;

class IteratorExtractor implements ExtractorInterface

Check failure on line 10 in src/Extractor/IteratorExtractor.php

GitHub Actions / phpstan

Class Kiboko\Component\Pipeline\Extractor\IteratorExtractor implements generic interface Kiboko\Contract\Pipeline\ExtractorInterface but does not specify its types: Type

Check failure on line 10 in src/Extractor/IteratorExtractor.php

GitHub Actions / phpstan

Class Kiboko\Component\Pipeline\Extractor\IteratorExtractor implements generic interface Kiboko\Contract\Pipeline\ExtractorInterface but does not specify its types: Type

Check failure on line 10 in src/Extractor/IteratorExtractor.php

GitHub Actions / phpstan

Class Kiboko\Component\Pipeline\Extractor\IteratorExtractor implements generic interface Kiboko\Contract\Pipeline\ExtractorInterface but does not specify its types: Type
32 changes: 22 additions & 10 deletions src/Pipeline.php
Original file line number Diff line number Diff line change
@@ -14,6 +14,9 @@
use Kiboko\Contract\Pipeline\RejectionInterface;
use Kiboko\Contract\Pipeline\RunnableInterface;
use Kiboko\Contract\Pipeline\StateInterface;
use Kiboko\Contract\Pipeline\StepCodeInterface;
use Kiboko\Contract\Pipeline\StepRejectionInterface;
use Kiboko\Contract\Pipeline\StepStateInterface;
use Kiboko\Contract\Pipeline\TransformerInterface;
use Kiboko\Contract\Pipeline\TransformingInterface;
use Kiboko\Contract\Pipeline\WalkableInterface;
@@ -23,8 +26,11 @@ class Pipeline implements PipelineInterface, WalkableInterface, RunnableInterfac
private readonly \AppendIterator $source;
private iterable $subject;

public function __construct(private readonly PipelineRunnerInterface $runner, ?\Iterator $source = null)
{
public function __construct(
private readonly PipelineRunnerInterface $runner,
private readonly StateInterface $state,
?\Iterator $source = null
) {
$this->source = new \AppendIterator();
$this->source->append($source ?? new \EmptyIterator());

Check warning on line 35 in src/Pipeline.php

GitHub Actions / infection

Escaped Mutant for Mutator "Coalesce": --- Original +++ New @@ @@ public function __construct(private readonly PipelineRunnerInterface $runner, private readonly StateInterface $state, ?\Iterator $source = null) { $this->source = new \AppendIterator(); - $this->source->append($source ?? new \EmptyIterator()); + $this->source->append(new \EmptyIterator() ?? $source); $this->subject = new \NoRewindIterator($this->source); } public function feed(...$data) : void

Check warning on line 35 in src/Pipeline.php

GitHub Actions / infection

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ public function __construct(private readonly PipelineRunnerInterface $runner, private readonly StateInterface $state, ?\Iterator $source = null) { $this->source = new \AppendIterator(); - $this->source->append($source ?? new \EmptyIterator()); + $this->subject = new \NoRewindIterator($this->source); } public function feed(...$data) : void

@@ -39,14 +45,14 @@ public function feed(...$data): void
private function passThroughCoroutine(): \Generator
{
$line = yield;
while ($line = yield $line) {
}
while ($line = yield $line);
}

public function extract(
StepCodeInterface $stepCode,
ExtractorInterface $extractor,
RejectionInterface $rejection,
StateInterface $state,
StepRejectionInterface $rejection,
StepStateInterface $state,
): ExtractingInterface {
$extract = $extractor->extract();
if (\is_array($extract)) {
@@ -84,9 +90,10 @@ public function extract(
}

public function transform(
StepCodeInterface $stepCode,
TransformerInterface $transformer,
RejectionInterface $rejection,
StateInterface $state,
StepRejectionInterface $rejection,
StepStateInterface $state,
): TransformingInterface {
if ($transformer instanceof FlushableInterface) {
$iterator = new \AppendIterator();
@@ -125,9 +132,10 @@ public function transform(
}

public function load(
StepCodeInterface $stepCode,
LoaderInterface $loader,
RejectionInterface $rejection,
StateInterface $state,
StepRejectionInterface $rejection,
StepStateInterface $state,
): LoadingInterface {
if ($loader instanceof FlushableInterface) {
$iterator = new \AppendIterator();
@@ -168,7 +176,11 @@ public function load(

public function walk(): \Iterator
{
$this->state->initialize();

Check warning on line 179 in src/Pipeline.php

GitHub Actions / infection

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ } public function walk() : \Iterator { - $this->state->initialize(); + yield from $this->subject; $this->state->teardown(); }

yield from $this->subject;

$this->state->teardown();

Check warning on line 183 in src/Pipeline.php

GitHub Actions / infection

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ { $this->state->initialize(); yield from $this->subject; - $this->state->teardown(); + } public function run(int $interval = 1000) : int {
}

public function run(int $interval = 1000): int
12 changes: 4 additions & 8 deletions src/PipelineRunner.php
Original file line number Diff line number Diff line change
@@ -10,6 +10,8 @@
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
use Kiboko\Contract\Pipeline\RejectionInterface;
use Kiboko\Contract\Pipeline\StateInterface;
use Kiboko\Contract\Pipeline\StepRejectionInterface;
use Kiboko\Contract\Pipeline\StepStateInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Psr\Log\NullLogger;
@@ -23,12 +25,9 @@ public function __construct(private readonly LoggerInterface $logger = new NullL
public function run(
\Iterator $source,
\Generator $coroutine,
RejectionInterface $rejection,
StateInterface $state,
StepRejectionInterface $rejection,
StepStateInterface $state,
): \Iterator {
$state->initialize();
$rejection->initialize();

$wrapper = new GeneratorWrapper();
$wrapper->rewind($source, $coroutine);

@@ -69,8 +68,5 @@ public function run(

$wrapper->next($source);
}

$state->teardown();
$rejection->teardown();
}
}
25 changes: 25 additions & 0 deletions src/StepCode.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Pipeline;

use Kiboko\Contract\Pipeline\StepCodeInterface;

final class StepCode implements StepCodeInterface
{
private function __construct(
private readonly string $reference,
) {
}

public static function fromString(string $reference): self
{
return new self($reference);
}

public function __toString(): string
{
return $this->reference;
}
}
4 changes: 3 additions & 1 deletion tests/unit/PipelineRunnerTest.php
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@
use Kiboko\Component\Pipeline\PipelineRunner;
use Kiboko\Contract\Pipeline\NullRejection;
use Kiboko\Contract\Pipeline\NullState;
use Kiboko\Contract\Pipeline\NullStepRejection;
use Kiboko\Contract\Pipeline\NullStepState;
use PHPUnit\Framework\TestResult;
use Psr\Log\NullLogger;

@@ -115,7 +117,7 @@ public function testRun(\Iterator $source, callable $callback, array $expected):
{
$run = new PipelineRunner(new NullLogger());

$it = $run->run($source, $callback(), new NullRejection(), new NullState());
$it = $run->run($source, $callback(), new NullStepRejection(), new NullStepState());

$this->assertIteration(new \ArrayIterator($expected), $it);
}
156 changes: 92 additions & 64 deletions tests/unit/PipelineTest.php
Original file line number Diff line number Diff line change
@@ -7,12 +7,15 @@
use Kiboko\Component\Bucket\AcceptanceResultBucket;
use Kiboko\Component\Pipeline\Pipeline;
use Kiboko\Component\Pipeline\PipelineRunner;
use Kiboko\Component\Pipeline\StepCode;
use Kiboko\Contract\Bucket\ResultBucketInterface;
use Kiboko\Contract\Pipeline\ExtractorInterface;
use Kiboko\Contract\Pipeline\FlushableInterface;
use Kiboko\Contract\Pipeline\LoaderInterface;
use Kiboko\Contract\Pipeline\NullRejection;
use Kiboko\Contract\Pipeline\NullState;
use Kiboko\Contract\Pipeline\NullStepRejection;
use Kiboko\Contract\Pipeline\NullStepState;
use Kiboko\Contract\Pipeline\TransformerInterface;
use Psr\Log\NullLogger;

@@ -23,16 +26,21 @@ final class PipelineTest extends IterableTestCase
{
public function testExtractorWithoutFlush(): void
{
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()));

$pipeline->extract(new class() implements ExtractorInterface {
public function extract(): iterable
{
yield new AcceptanceResultBucket('lorem');
yield new AcceptanceResultBucket('ipsum');
yield new AcceptanceResultBucket('dolor');
}
}, new NullRejection(), new NullState());
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());

$pipeline->extract(
StepCode::fromString('extractor'),
new class() implements ExtractorInterface {
public function extract(): iterable
{
yield new AcceptanceResultBucket('lorem');
yield new AcceptanceResultBucket('ipsum');
yield new AcceptanceResultBucket('dolor');
}
},
new NullStepRejection(),
new NullStepState()
);

$this->assertIteration(
new \ArrayIterator(['lorem', 'ipsum', 'dolor']),
@@ -42,98 +50,118 @@ public function extract(): iterable

public function testTransformerWithoutFlush(): void
{
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());

$pipeline->feed(['lorem'], ['ipsum'], ['dolor']);

$pipeline->transform(new class() implements TransformerInterface {
public function transform(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
yield new AcceptanceResultBucket(str_rot13((string) $line));
}
}, new NullRejection(), new NullState());
$pipeline->transform(
StepCode::fromString('transformer'),
new class() implements TransformerInterface {
public function transform(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
}
},
new NullStepRejection(),
new NullStepState()
);

$this->assertIteration(
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]),
$pipeline->walk()
);
}

public function testTransformerWithFlush(): void
{
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());

$pipeline->feed(['lorem'], ['ipsum'], ['dolor']);

$pipeline->transform(new class() implements TransformerInterface, FlushableInterface {
public function transform(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
yield new AcceptanceResultBucket(str_rot13((string) $line));
}

public function flush(): ResultBucketInterface
{
return new AcceptanceResultBucket(str_rot13('sit amet'));
}
}, new NullRejection(), new NullState());
$pipeline->transform(
StepCode::fromString('transformer'),
new class() implements TransformerInterface, FlushableInterface {
public function transform(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
}

public function flush(): ResultBucketInterface
{
return new AcceptanceResultBucket([str_rot13('sit amet')]);
}
},
new NullStepRejection(),
new NullStepState()
);

$this->assertIteration(
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]),
$pipeline->walk()
);
}

public function testLoaderWithoutFlush(): void
{
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());

$pipeline->feed(['lorem'], ['ipsum'], ['dolor']);

$pipeline->load(new class() implements LoaderInterface {
public function load(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
yield new AcceptanceResultBucket(str_rot13((string) $line));
}
}, new NullRejection(), new NullState());
$pipeline->load(
StepCode::fromString('loader'),
new class() implements LoaderInterface {
public function load(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
}
},
new NullStepRejection(),
new NullStepState()
);

$this->assertIteration(
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]),
$pipeline->walk()
);
}

public function testLoaderWithFlush(): void
{
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
$pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());

$pipeline->feed(['lorem'], ['ipsum'], ['dolor']);

$pipeline->load(new class() implements LoaderInterface, FlushableInterface {
public function load(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
$line = yield new AcceptanceResultBucket(str_rot13((string) $line));
yield new AcceptanceResultBucket(str_rot13((string) $line));
}

public function flush(): ResultBucketInterface
{
return new AcceptanceResultBucket(str_rot13('sit amet'));
}
}, new NullRejection(), new NullState());
$pipeline->load(
StepCode::fromString('loader'),
new class() implements LoaderInterface, FlushableInterface {
public function load(): \Generator
{
$line = yield;
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
$line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
}

public function flush(): ResultBucketInterface
{
return new AcceptanceResultBucket([str_rot13('sit amet')]);
}
},
new NullStepRejection(),
new NullStepState()
);

$this->assertIteration(
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]),
$pipeline->walk()
);
}

0 comments on commit dcdcd18

Please sign in to comment.