Skip to content

Commit

Permalink
refactor: use refs
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed Nov 1, 2023
1 parent 999e77c commit 689dd06
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 13 deletions.
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
"autoload": {
"psr-4": {
"Bentools\\ETL\\": "src/"
}
},
"files": [
"src/functions.php"
]
},
"autoload-dev": {
"psr-4": {
Expand Down
43 changes: 31 additions & 12 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Bentools\ETL\Internal\ClonableTrait;
use Bentools\ETL\Internal\EtlBuilderTrait;
use Bentools\ETL\Internal\EtlExceptionsTrait;
use Bentools\ETL\Internal\Ref;
use Bentools\ETL\Loader\InMemoryLoader;
use Bentools\ETL\Loader\LoaderInterface;
use Bentools\ETL\Transformer\NullTransformer;
Expand Down Expand Up @@ -62,17 +63,19 @@ public function __construct(
public function process(mixed $source = null, mixed $destination = null): EtlState
{
$state = new EtlState(options: $this->options, source: $source, destination: $destination);
$stateHolder = ref($state);

try {
$this->dispatch(new InitEvent($state));

$iterator = new IteratorIterator($this->extract($state));
$iterator = new IteratorIterator($this->extract($stateHolder));
$iterator->rewind();
$state = unref($stateHolder);
while ($iterator->valid()) {
$extractedItem = $iterator->current();
try {
$transformedItems = $this->transform($extractedItem, $state);
$this->load($transformedItems, $state);
$this->load($transformedItems, $stateHolder);
} catch (SkipRequest) {
} finally {
$iterator->next();
Expand All @@ -81,31 +84,40 @@ public function process(mixed $source = null, mixed $destination = null): EtlSta
} catch (StopRequest) {
}

$output = $this->flush($state, false);
$output = $this->flush($stateHolder, false);

$state = unref($stateHolder);
if (!$state->nbTotalItems) {
$state = $state->withNbTotalItems($state->nbLoadedItems);
$stateHolder->replaceWith($state);
}

$state = $state->withOutput($output);
$stateHolder->replaceWith($state);
$this->dispatch(new EndEvent($state));

gc_collect_cycles();

return $state;
}

private function extract(EtlState &$state): Generator
/**
* @param Ref<EtlState> $stateHolder
*/
private function extract(Ref $stateHolder): Generator
{
$state = unref($stateHolder);
try {
$items = $this->extractor->extract($state);
if (is_countable($items)) {
$state = $state->withNbTotalItems(count($items));
$stateHolder->replaceWith($state);
}
$this->dispatch(new StartEvent($state));
foreach ($items as $key => $value) {
try {
$state = $state->withUpdatedItemKey($key);
$state = unref($stateHolder)->withUpdatedItemKey($key);
$stateHolder->replaceWith($state);
$event = $this->dispatch(new ExtractEvent($state, $value));
yield $event->item;
} catch (SkipRequest) {
Expand All @@ -114,7 +126,7 @@ private function extract(EtlState &$state): Generator
} catch (StopRequest) {
return;
} catch (Throwable $exception) {
$this->throwExtractException($exception, $state);
$this->throwExtractException($exception, unref($stateHolder));
}
}

Expand All @@ -137,27 +149,34 @@ private function transform(mixed $item, EtlState $state): array
}

/**
* @param list<mixed> $items
* @param list<mixed> $items
* @param Ref<EtlState> $stateHolder
*/
private function load(array $items, EtlState &$state): void
private function load(array $items, Ref $stateHolder): void
{
$state = unref($stateHolder);
try {
foreach ($items as $item) {
$this->loader->load($item, $state);
$state = $state->withIncrementedNbLoadedItems();
$stateHolder->replaceWith($state);
$this->dispatch(new LoadEvent($state, $item));
}
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
$this->throwLoadException($e, $state);
$this->throwLoadException($e, unref($stateHolder));
}

$this->flush($state, true);
$this->flush($stateHolder, true);
}

private function flush(EtlState &$state, bool $isPartial): mixed
/**
* @param Ref<EtlState> $stateHolder
*/
private function flush(Ref $stateHolder, bool $isPartial): mixed
{
$state = unref($stateHolder);
if ($isPartial && !$state->shouldFlush()) {
return null;
}
Expand All @@ -174,7 +193,7 @@ private function flush(EtlState &$state, bool $isPartial): mixed
$this->throwFlushException($e, $state);
}
$this->dispatch(new FlushEvent($state, $isPartial, $output));
$state = $state->withClearedFlush();
$stateHolder->replaceWith($state->withClearedFlush());

return $output;
}
Expand Down
49 changes: 49 additions & 0 deletions src/Internal/Ref.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Bentools\ETL\Internal;

/**
* @template T
*
* @internal
*/
final class Ref
{
/**
* @param T $value
*/
private function __construct(
public mixed $value,
) {
}

/**
* @param T $value
*
* @return self<T>
*/
public function replaceWith(mixed $value): self
{
$this->value = $value;

return $this;
}

/**
* @param T $value
*
* @return self<T>
*/
public static function create(mixed $value): self
{
static $prototype;
$prototype ??= new self(null);

$ref = clone $prototype;
$ref->value = $value;

return $ref;
}
}
31 changes: 31 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Bentools\ETL;

use Bentools\ETL\Internal\Ref;

/**
* @template T
*
* @param T $value
*
* @return Ref<T>
*/
function ref(mixed $value): Ref
{
return Ref::create($value);
}

/**
* @template T
*
* @param Ref<T> $ref
*
* @return T
*/
function unref(Ref $ref): mixed
{
return $ref->value;
}

0 comments on commit 689dd06

Please sign in to comment.