Skip to content

Commit

Permalink
Fix recursive fetching with NextToken and allow to proceed previous q…
Browse files Browse the repository at this point in the history
…ueries
  • Loading branch information
Timo Hund committed Aug 4, 2023
1 parent eb34605 commit cd036c5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
41 changes: 40 additions & 1 deletion src/Dto/TimestreamReaderDto.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

final class TimestreamReaderDto extends AbstractTimestreamDto
{
private ?int $maxRows = null;

private string $nextTokenToContinueReading = '';

public function __construct(protected Builder $builder, string $forTable = null)
{
$this->database = config('timestream.database');
Expand Down Expand Up @@ -37,8 +41,43 @@ protected function getQueryString(): string

public function toArray(): array
{
return [
$params = [
'QueryString' => $this->getQueryString(),
];

if ($this->maxRows) {
$params['MaxRows'] = $this->maxRows;
}

// we can pass an initial next token to proceed previous queries
if ($this->nextTokenToContinueReading !== '') {
$params['NextToken'] = $this->nextTokenToContinueReading;
}

return $params;
}

/**
* @param int|null $maxRows
*
* @return TimestreamReaderDto
*/
public function setMaximumRowLimit(?int $maxRows): TimestreamReaderDto
{
$this->maxRows = $maxRows;

return $this;
}

/**
* @param string $nextTokenToContinueReading
*
* @return TimestreamReaderDto
*/
public function setNextTokenToContinueReading(string $nextTokenToContinueReading): TimestreamReaderDto
{
$this->nextTokenToContinueReading = $nextTokenToContinueReading;

return $this;
}
}
29 changes: 22 additions & 7 deletions src/TimestreamService.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,18 @@ class TimestreamService
{
public TimestreamQueryClient $reader;

/**
* @return string|null
*/
public function getNextToken(): ?string
{
return $this->nextToken;
}

public TimestreamWriteClient $writer;

private ?string $nextToken = null;

public function __construct(TimestreamManager $manager)
{
$this->reader = $manager->getReader();
Expand Down Expand Up @@ -71,14 +81,14 @@ private function ingest(array $payload): \Aws\Result

public function query(TimestreamReaderDto $timestreamReader): Collection
{
return $this->runQuery($timestreamReader);
$params = $timestreamReader->toArray();
return $this->runQuery($params, $params['MaxRows'] ?? PHP_INT_MAX);
}

private function runQuery(TimestreamReaderDto $timestreamReader, string $nextToken = null): Collection
private function runQuery($params, int $rowsLeft): Collection
{
$params = $timestreamReader->toArray();
if ($nextToken) {
$params['NextToken'] = $nextToken;
if ($rowsLeft <= 0) {
return collect();
}

try {
Expand All @@ -87,8 +97,13 @@ private function runQuery(TimestreamReaderDto $timestreamReader, string $nextTok
}

$result = $this->reader->query($params);
if ($token = $result->get('NextToken')) {
return $this->runQuery($timestreamReader, $token);
$this->nextToken = $result->get('NextToken');
if ($this->nextToken !== null) {
$parsedRows = $this->parseQueryResult($result);
$rowsLeft -= $parsedRows->count();
$params['NextToken'] = $this->nextToken;
// we fetch everything recursively until the limit has been reached or there is no more data
return $this->runQuery($params, $rowsLeft)->merge($parsedRows);
}
} catch (TimestreamQueryException $e) {
throw new FailTimestreamQueryException($e, $params);
Expand Down

0 comments on commit cd036c5

Please sign in to comment.