Skip to content

Commit

Permalink
Feat: Only 1 final flush by default (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Oct 30, 2023
1 parent 356108d commit 4ccf009
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 16 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,17 @@ You can stop the workflow at any time.

Use the `$state->stop()` method from the `EtlState` object as soon as your business logic requires it.

Early flush
-----------
Flush frequency and early flushes
---------------------------------

You can define the flush frequency (defaults to 1) and optionally flush earlier than expected at any time:
By default, the `flush()` method of your loader will be invoked at the end of the ETL,
meaning it will likely keep all loaded items in memory before dumping them to their final destination.

Feel free to adjust a `flushFrequency` that fits your needs
and optionally trigger an early flush at any time during the ETL process:

```php
$etl = (new EtlExecutor(options: new EtlConfiguration(flushEvery: 10)))
$etl = (new EtlExecutor(options: new EtlConfiguration(flushFrequency: 10)))
->onLoad(
function (LoadEvent $event) {
if (/* whatever reason */) {
Expand Down
18 changes: 17 additions & 1 deletion src/EtlConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@

namespace Bentools\ETL;

use InvalidArgumentException;

use function is_float;
use function sprintf;

use const INF;

final readonly class EtlConfiguration
{
public float|int $flushFrequency;

public function __construct(
public int $flushEvery = 1,
float|int $flushEvery = INF,
) {
if (INF !== $flushEvery && is_float($flushEvery)) {
throw new InvalidArgumentException('Expected \\INF or int, float given.');
}
if ($flushEvery < 1) {
throw new InvalidArgumentException(sprintf('Expected positive integer > 0, got %d', $flushEvery));
}
$this->flushFrequency = $flushEvery;
}
}
14 changes: 9 additions & 5 deletions src/EtlState.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function __construct(
public readonly ?DateTimeImmutable $endedAt = null,
public readonly mixed $output = null,
private readonly int $nbLoadedItemsSinceLastFlush = 0,
private bool $flush = false,
private bool $earlyFlush = false,
public array $context = [],
) {
}
Expand All @@ -39,7 +39,7 @@ public function __construct(
*/
public function flush(): void
{
$this->flush = true;
$this->earlyFlush = true;
}

/**
Expand All @@ -60,8 +60,12 @@ public function stop(): void

public function shouldFlush(): bool
{
return $this->flush
|| (0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushEvery));
if (INF === $this->options->flushFrequency) {
return false;
}

return $this->earlyFlush
|| (0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency));
}

public function getDuration(): float
Expand Down Expand Up @@ -116,7 +120,7 @@ public function withOutput(mixed $output): self
public function withClearedFlush(): self
{
return $this->clone([
'flush' => false,
'earlyFlush' => false,
'nbLoadedItemsSinceLastFlush' => 0,
]);
}
Expand Down
16 changes: 16 additions & 0 deletions tests/Unit/EtlConfigurationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Unit;

use Bentools\ETL\EtlConfiguration;
use InvalidArgumentException;

it('denies float values', function () {
new EtlConfiguration(flushEvery: 2.1);
})->throws(InvalidArgumentException::class);

it('denies negative values', function () {
new EtlConfiguration(flushEvery: -10);
})->throws(InvalidArgumentException::class);
21 changes: 15 additions & 6 deletions tests/Unit/Loader/JSONLoaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@

namespace BenTools\ETL\Tests\Unit\Loader;

use Bentools\ETL\EtlConfiguration;
use Bentools\ETL\EtlExecutor;
use Bentools\ETL\Loader\JSONLoader;
use SplFileObject;

use function dataset;
use function dirname;
use function expect;
use function implode;
use function sys_get_temp_dir;
use function uniqid;

it('loads items to a JSON file', function () {
use const INF;

dataset('config', [
new EtlConfiguration(flushEvery: INF),
new EtlConfiguration(flushEvery: 3),
]);

it('loads items to a JSON file', function (EtlConfiguration $options) {
$cities = require dirname(__DIR__, 2).'/data/10-biggest-cities.php';
$destination = 'file://'.sys_get_temp_dir().'/'.uniqid('10-biggest-cities_').'.json';
$executor = new EtlExecutor(loader: new JSONLoader($destination));
$executor = new EtlExecutor(loader: new JSONLoader($destination), options: $options);
$output = $executor->process($cities)->output;
expect($output)->toBe($destination);

Expand All @@ -27,15 +36,15 @@
$expectedContent = implode('', [...new SplFileObject(dirname(__DIR__, 2).'/data/10-biggest-cities.json', 'r')]);

expect($writtenContent)->toBe($expectedContent);
});
})->with('config');

it('loads items to a JSON string', function () {
it('loads items to a JSON string', function (EtlConfiguration $options) {
$cities = require dirname(__DIR__, 2).'/data/10-biggest-cities.php';
$executor = new EtlExecutor(loader: new JSONLoader());
$executor = new EtlExecutor(loader: new JSONLoader(), options: $options);
$output = $executor->process($cities)->output;

// @phpstan-ignore-next-line
$expectedContent = implode('', [...new SplFileObject(dirname(__DIR__, 2).'/data/10-biggest-cities.json', 'r')]);

expect($output)->toBe($expectedContent);
});
})->with('config');

0 comments on commit 4ccf009

Please sign in to comment.