Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Improve chaining #30

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,44 @@ $report = withRecipe(new LoggerRecipe($logger))
->process(['foo', 'bar']);
```

Chaining extractors / transformers / loaders
-------------------------------------------

Instead of replacing existing extractors / transformers / loaders inside your `EtlExecutor`,
you can decorate them by using the `chain` function:

```php
use BenTools\ETL\EtlExecutor;
use ArrayObject;

use function BenTools\ETL\chain;
use function implode;
use function str_split;
use function strtoupper;

$a = new ArrayObject();
$executor = (new EtlExecutor())
->extractFrom(fn () => yield 'foo')
->transformWith(fn (string $value) => strtoupper($value))
->loadInto(fn (string $value) => $a->append($value));

$b = new ArrayObject();
$executor = $executor
->extractFrom(
chain($executor->extractor)->with(fn () => ['bar'])
)
->transformWith(
chain($executor->transformer)->with(fn (string $value) => implode('-', str_split($value)))
)
->loadInto(
chain($executor->loader)->with(fn (string $value) => $b->append($value))
);

$executor->process();
var_dump([...$a]); // ['F-O-O', 'B-A-R']
var_dump([...$b]); // ['F-O-O', 'B-A-R']

```

Contribute
----------
Expand Down
8 changes: 8 additions & 0 deletions src/Extractor/ChainExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public function extract(EtlState $state): iterable
}
}
}

public static function from(ExtractorInterface $extractor): self
{
return match ($extractor instanceof self) {
true => $extractor,
false => new self($extractor),
};
}
}
32 changes: 11 additions & 21 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;

use function count;

/**
* @internal
*
Expand All @@ -30,8 +28,10 @@ trait EtlBuilderTrait
*/
use EtlEventListenersTrait;

public function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|callable ...$extractors): self
{
public function extractFrom(
ExtractorInterface|callable $extractor,
ExtractorInterface|callable ...$extractors
): self {
$extractors = [$extractor, ...$extractors];

foreach ($extractors as $e => $_extractor) {
Expand All @@ -40,15 +40,13 @@ public function extractFrom(ExtractorInterface|callable $extractor, ExtractorInt
}
}

if (count($extractors) > 1) {
return $this->cloneWith(['extractor' => new ChainExtractor(...$extractors)]);
}

return $this->cloneWith(['extractor' => $extractors[0]]);
return $this->cloneWith(['extractor' => new ChainExtractor(...$extractors)]);
}

public function transformWith(TransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers): self
{
public function transformWith(
TransformerInterface|callable $transformer,
TransformerInterface|callable ...$transformers
): self {
$transformers = [$transformer, ...$transformers];

foreach ($transformers as $t => $_transformer) {
Expand All @@ -57,11 +55,7 @@ public function transformWith(TransformerInterface|callable $transformer, Transf
}
}

if (count($transformers) > 1) {
return $this->cloneWith(['transformer' => new ChainTransformer(...$transformers)]);
}

return $this->cloneWith(['transformer' => $transformers[0]]);
return $this->cloneWith(['transformer' => new ChainTransformer(...$transformers)]);
}

public function loadInto(LoaderInterface|callable $loader, LoaderInterface|callable ...$loaders): self
Expand All @@ -74,11 +68,7 @@ public function loadInto(LoaderInterface|callable $loader, LoaderInterface|calla
}
}

if (count($loaders) > 1) {
return $this->cloneWith(['loader' => new ChainLoader(...$loaders)]);
}

return $this->cloneWith(['loader' => $loaders[0]]);
return $this->cloneWith(['loader' => new ChainLoader(...$loaders)]);
}

public function withOptions(EtlConfiguration $configuration): self
Expand Down
8 changes: 8 additions & 0 deletions src/Loader/ChainLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public function flush(bool $isPartial, EtlState $state): mixed

return $output ?? null;
}

public static function from(LoaderInterface $loader): self
{
return match ($loader instanceof self) {
true => $loader,
false => new self($loader),
};
}
}
8 changes: 8 additions & 0 deletions src/Transformer/ChainTransformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public function doTransform(mixed $item, EtlState $state): mixed

return $item;
}

public static function from(TransformerInterface $transformer): self
{
return match ($transformer instanceof self) {
true => $transformer,
false => new self($transformer),
};
}
}
38 changes: 26 additions & 12 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace BenTools\ETL;

use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Internal\Ref;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;

use function array_fill_keys;
Expand All @@ -16,13 +19,13 @@
use function func_get_args;

/**
* @internal
*
* @param list<string> $keys
* @param array<string, mixed> $values
* @param array<string, mixed> ...$extraValues
*
* @return array<string, mixed>
*
* @internal
*/
function array_fill_from(array $keys, array $values, array ...$extraValues): array
{
Expand All @@ -33,27 +36,27 @@ function array_fill_from(array $keys, array $values, array ...$extraValues): arr
}

/**
* @internal
*
* @template T
*
* @param T $value
*
* @return Ref<T>
*
* @internal
*
* @template T
*/
function ref(mixed $value): Ref
{
return Ref::create($value);
}

/**
* @internal
*
* @template T
*
* @param Ref<T> $ref
*
* @return T
*
* @internal
*
* @template T
*/
function unref(Ref $ref): mixed
{
Expand All @@ -65,8 +68,10 @@ function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|
return (new EtlExecutor())->extractFrom(...func_get_args());
}

function transformWith(TransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers): EtlExecutor
{
function transformWith(
TransformerInterface|callable $transformer,
TransformerInterface|callable ...$transformers
): EtlExecutor {
return (new EtlExecutor())->transformWith(...func_get_args());
}

Expand All @@ -79,3 +84,12 @@ function withRecipe(Recipe|callable $recipe): EtlExecutor
{
return (new EtlExecutor())->withRecipe(...func_get_args());
}

function chain(ExtractorInterface|TransformerInterface|LoaderInterface $service,
): ChainExtractor|ChainTransformer|ChainLoader {
return match (true) {
$service instanceof ExtractorInterface => ChainExtractor::from($service),
$service instanceof TransformerInterface => ChainTransformer::from($service),
$service instanceof LoaderInterface => ChainLoader::from($service),
};
}
13 changes: 7 additions & 6 deletions tests/Unit/Extractor/ChainExtractorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
namespace BenTools\ETL\Tests\Unit\Extractor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\CallableExtractor;

use function BenTools\ETL\chain;
use function BenTools\ETL\extractFrom;
use function expect;

it('chains extractors', function () {
// Given
$extractor = (new ChainExtractor(
fn () => 'banana',
fn () => yield from ['apple', 'strawberry'],
))->with(fn () => ['raspberry', 'peach']);
$executor = (new EtlExecutor($extractor));
$executor = new EtlExecutor(new CallableExtractor(fn () => 'banana'));
$executor = $executor->extractFrom(chain($executor->extractor)
->with(fn () => yield from ['apple', 'strawberry'])
->with(fn () => ['raspberry', 'peach']))
;

// When
$report = $executor->process();
Expand Down
54 changes: 29 additions & 25 deletions tests/Unit/Loader/ChainLoaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,50 @@
use ArrayObject;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\CallableLoader;
use BenTools\ETL\Loader\ConditionalLoaderInterface;

use function BenTools\ETL\chain;
use function expect;

it('chains loaders', function () {
// Background
$a = new ArrayObject();
$b = new ArrayObject();
$c = new ArrayObject();
$loader = (new ChainLoader(

$executor = new EtlExecutor(loader: new CallableLoader(
fn (string $item) => $a[] = $item, // @phpstan-ignore-line
fn (string $item) => $b[] = $item, // @phpstan-ignore-line
))
->with(
new class() implements ConditionalLoaderInterface {
public function supports(mixed $item, EtlState $state): bool
{
return 'foo' !== $item;
}

public function load(mixed $item, EtlState $state): void
{
$state->context[__CLASS__][] = $item;
}

public function flush(bool $isPartial, EtlState $state): mixed
{
foreach ($state->context[__CLASS__] as $item) {
$state->context['storage'][] = $item;
));
$executor = $executor->loadInto(
chain($executor->loader)
->with(fn (string $item) => $b[] = $item) // @phpstan-ignore-line
->with(
new class() implements ConditionalLoaderInterface {
public function supports(mixed $item, EtlState $state): bool
{
return 'foo' !== $item;
}

public function load(mixed $item, EtlState $state): void
{
$state->context[__CLASS__][] = $item;
}

return $state->context['storage'];
}
},
);
public function flush(bool $isPartial, EtlState $state): mixed
{
foreach ($state->context[__CLASS__] as $item) {
$state->context['storage'][] = $item;
}

return $state->context['storage'];
}
},
)
);

// Given
$input = ['foo', 'bar'];
$executor = new EtlExecutor(loader: $loader);

// When
$executor->process($input, context: ['storage' => $c]);
Expand Down
Loading
Loading