Skip to content

Commit

Permalink
Feat: Improve chaining (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 13, 2023
1 parent 74b2b72 commit b5c9d9f
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 80 deletions.
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,44 @@ $report = withRecipe(new LoggerRecipe($logger))
->process(['foo', 'bar']);
```

Chaining extractors / transformers / loaders
-------------------------------------------

Instead of replacing existing extractors / transformers / loaders inside your `EtlExecutor`,
you can decorate them by using the `chain` function:

```php
use BenTools\ETL\EtlExecutor;
use ArrayObject;

use function BenTools\ETL\chain;
use function implode;
use function str_split;
use function strtoupper;

$a = new ArrayObject();
$executor = (new EtlExecutor())
->extractFrom(fn () => yield 'foo')
->transformWith(fn (string $value) => strtoupper($value))
->loadInto(fn (string $value) => $a->append($value));

$b = new ArrayObject();
$executor = $executor
->extractFrom(
chain($executor->extractor)->with(fn () => ['bar'])
)
->transformWith(
chain($executor->transformer)->with(fn (string $value) => implode('-', str_split($value)))
)
->loadInto(
chain($executor->loader)->with(fn (string $value) => $b->append($value))
);

$executor->process();
var_dump([...$a]); // ['F-O-O', 'B-A-R']
var_dump([...$b]); // ['F-O-O', 'B-A-R']

```

Contribute
----------
Expand Down
8 changes: 8 additions & 0 deletions src/Extractor/ChainExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public function extract(EtlState $state): iterable
}
}
}

public static function from(ExtractorInterface $extractor): self
{
return match ($extractor instanceof self) {
true => $extractor,
false => new self($extractor),
};
}
}
32 changes: 11 additions & 21 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;

use function count;

/**
* @internal
*
Expand All @@ -30,8 +28,10 @@ trait EtlBuilderTrait
*/
use EtlEventListenersTrait;

public function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|callable ...$extractors): self
{
public function extractFrom(
ExtractorInterface|callable $extractor,
ExtractorInterface|callable ...$extractors
): self {
$extractors = [$extractor, ...$extractors];

foreach ($extractors as $e => $_extractor) {
Expand All @@ -40,15 +40,13 @@ public function extractFrom(ExtractorInterface|callable $extractor, ExtractorInt
}
}

if (count($extractors) > 1) {
return $this->cloneWith(['extractor' => new ChainExtractor(...$extractors)]);
}

return $this->cloneWith(['extractor' => $extractors[0]]);
return $this->cloneWith(['extractor' => new ChainExtractor(...$extractors)]);
}

public function transformWith(TransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers): self
{
public function transformWith(
TransformerInterface|callable $transformer,
TransformerInterface|callable ...$transformers
): self {
$transformers = [$transformer, ...$transformers];

foreach ($transformers as $t => $_transformer) {
Expand All @@ -57,11 +55,7 @@ public function transformWith(TransformerInterface|callable $transformer, Transf
}
}

if (count($transformers) > 1) {
return $this->cloneWith(['transformer' => new ChainTransformer(...$transformers)]);
}

return $this->cloneWith(['transformer' => $transformers[0]]);
return $this->cloneWith(['transformer' => new ChainTransformer(...$transformers)]);
}

public function loadInto(LoaderInterface|callable $loader, LoaderInterface|callable ...$loaders): self
Expand All @@ -74,11 +68,7 @@ public function loadInto(LoaderInterface|callable $loader, LoaderInterface|calla
}
}

if (count($loaders) > 1) {
return $this->cloneWith(['loader' => new ChainLoader(...$loaders)]);
}

return $this->cloneWith(['loader' => $loaders[0]]);
return $this->cloneWith(['loader' => new ChainLoader(...$loaders)]);
}

public function withOptions(EtlConfiguration $configuration): self
Expand Down
8 changes: 8 additions & 0 deletions src/Loader/ChainLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public function flush(bool $isPartial, EtlState $state): mixed

return $output ?? null;
}

public static function from(LoaderInterface $loader): self
{
return match ($loader instanceof self) {
true => $loader,
false => new self($loader),
};
}
}
8 changes: 8 additions & 0 deletions src/Transformer/ChainTransformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public function doTransform(mixed $item, EtlState $state): mixed

return $item;
}

public static function from(TransformerInterface $transformer): self
{
return match ($transformer instanceof self) {
true => $transformer,
false => new self($transformer),
};
}
}
38 changes: 26 additions & 12 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace BenTools\ETL;

use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Internal\Ref;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;

use function array_fill_keys;
Expand All @@ -16,13 +19,13 @@
use function func_get_args;

/**
* @internal
*
* @param list<string> $keys
* @param array<string, mixed> $values
* @param array<string, mixed> ...$extraValues
*
* @return array<string, mixed>
*
* @internal
*/
function array_fill_from(array $keys, array $values, array ...$extraValues): array
{
Expand All @@ -33,27 +36,27 @@ function array_fill_from(array $keys, array $values, array ...$extraValues): arr
}

/**
* @internal
*
* @template T
*
* @param T $value
*
* @return Ref<T>
*
* @internal
*
* @template T
*/
function ref(mixed $value): Ref
{
return Ref::create($value);
}

/**
* @internal
*
* @template T
*
* @param Ref<T> $ref
*
* @return T
*
* @internal
*
* @template T
*/
function unref(Ref $ref): mixed
{
Expand All @@ -65,8 +68,10 @@ function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|
return (new EtlExecutor())->extractFrom(...func_get_args());
}

function transformWith(TransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers): EtlExecutor
{
function transformWith(
TransformerInterface|callable $transformer,
TransformerInterface|callable ...$transformers
): EtlExecutor {
return (new EtlExecutor())->transformWith(...func_get_args());
}

Expand All @@ -79,3 +84,12 @@ function withRecipe(Recipe|callable $recipe): EtlExecutor
{
return (new EtlExecutor())->withRecipe(...func_get_args());
}

function chain(ExtractorInterface|TransformerInterface|LoaderInterface $service,
): ChainExtractor|ChainTransformer|ChainLoader {
return match (true) {
$service instanceof ExtractorInterface => ChainExtractor::from($service),
$service instanceof TransformerInterface => ChainTransformer::from($service),
$service instanceof LoaderInterface => ChainLoader::from($service),
};
}
13 changes: 7 additions & 6 deletions tests/Unit/Extractor/ChainExtractorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
namespace BenTools\ETL\Tests\Unit\Extractor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\CallableExtractor;

use function BenTools\ETL\chain;
use function BenTools\ETL\extractFrom;
use function expect;

it('chains extractors', function () {
// Given
$extractor = (new ChainExtractor(
fn () => 'banana',
fn () => yield from ['apple', 'strawberry'],
))->with(fn () => ['raspberry', 'peach']);
$executor = (new EtlExecutor($extractor));
$executor = new EtlExecutor(new CallableExtractor(fn () => 'banana'));
$executor = $executor->extractFrom(chain($executor->extractor)
->with(fn () => yield from ['apple', 'strawberry'])
->with(fn () => ['raspberry', 'peach']))
;

// When
$report = $executor->process();
Expand Down
54 changes: 29 additions & 25 deletions tests/Unit/Loader/ChainLoaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,50 @@
use ArrayObject;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\CallableLoader;
use BenTools\ETL\Loader\ConditionalLoaderInterface;

use function BenTools\ETL\chain;
use function expect;

it('chains loaders', function () {
// Background
$a = new ArrayObject();
$b = new ArrayObject();
$c = new ArrayObject();
$loader = (new ChainLoader(

$executor = new EtlExecutor(loader: new CallableLoader(
fn (string $item) => $a[] = $item, // @phpstan-ignore-line
fn (string $item) => $b[] = $item, // @phpstan-ignore-line
))
->with(
new class() implements ConditionalLoaderInterface {
public function supports(mixed $item, EtlState $state): bool
{
return 'foo' !== $item;
}

public function load(mixed $item, EtlState $state): void
{
$state->context[__CLASS__][] = $item;
}

public function flush(bool $isPartial, EtlState $state): mixed
{
foreach ($state->context[__CLASS__] as $item) {
$state->context['storage'][] = $item;
));
$executor = $executor->loadInto(
chain($executor->loader)
->with(fn (string $item) => $b[] = $item) // @phpstan-ignore-line
->with(
new class() implements ConditionalLoaderInterface {
public function supports(mixed $item, EtlState $state): bool
{
return 'foo' !== $item;
}

public function load(mixed $item, EtlState $state): void
{
$state->context[__CLASS__][] = $item;
}

return $state->context['storage'];
}
},
);
public function flush(bool $isPartial, EtlState $state): mixed
{
foreach ($state->context[__CLASS__] as $item) {
$state->context['storage'][] = $item;
}

return $state->context['storage'];
}
},
)
);

// Given
$input = ['foo', 'bar'];
$executor = new EtlExecutor(loader: $loader);

// When
$executor->process($input, context: ['storage' => $c]);
Expand Down
Loading

0 comments on commit b5c9d9f

Please sign in to comment.