diff --git a/src/Dto/TimestreamReaderDto.php b/src/Dto/TimestreamReaderDto.php index 6baa629..8aa9be6 100644 --- a/src/Dto/TimestreamReaderDto.php +++ b/src/Dto/TimestreamReaderDto.php @@ -6,6 +6,10 @@ final class TimestreamReaderDto extends AbstractTimestreamDto { + private ?int $maxRows = null; + + private string $nextTokenToContinueReading = ''; + public function __construct(protected Builder $builder, string $forTable = null) { $this->database = config('timestream.database'); @@ -37,8 +41,43 @@ protected function getQueryString(): string public function toArray(): array { - return [ + $params = [ 'QueryString' => $this->getQueryString(), ]; + + if ($this->maxRows) { + $params['MaxRows'] = $this->maxRows; + } + + // we can pass an initial next token to proceed previous queries + if ($this->nextTokenToContinueReading !== '') { + $params['NextToken'] = $this->nextTokenToContinueReading; + } + + return $params; + } + + /** + * @param int|null $maxRows + * + * @return TimestreamReaderDto + */ + public function setMaximumRowLimit(?int $maxRows): TimestreamReaderDto + { + $this->maxRows = $maxRows; + + return $this; + } + + /** + * @param string $nextTokenToContinueReading + * + * @return TimestreamReaderDto + */ + public function setNextTokenToContinueReading(string $nextTokenToContinueReading): TimestreamReaderDto + { + $this->nextTokenToContinueReading = $nextTokenToContinueReading; + + return $this; } } diff --git a/src/TimestreamService.php b/src/TimestreamService.php index d5b4d92..6694761 100644 --- a/src/TimestreamService.php +++ b/src/TimestreamService.php @@ -22,8 +22,18 @@ class TimestreamService { public TimestreamQueryClient $reader; + /** + * @return string|null + */ + public function getNextToken(): ?string + { + return $this->nextToken; + } + public TimestreamWriteClient $writer; + private ?string $nextToken = null; + public function __construct(TimestreamManager $manager) { $this->reader = $manager->getReader(); @@ -71,14 +81,14 @@ private function ingest(array $payload): \Aws\Result public function query(TimestreamReaderDto $timestreamReader): Collection { - return $this->runQuery($timestreamReader); + $params = $timestreamReader->toArray(); + return $this->runQuery($params, $params['MaxRows'] ?? PHP_INT_MAX); } - private function runQuery(TimestreamReaderDto $timestreamReader, string $nextToken = null): Collection + private function runQuery($params, int $rowsLeft): Collection { - $params = $timestreamReader->toArray(); - if ($nextToken) { - $params['NextToken'] = $nextToken; + if ($rowsLeft <= 0) { + return collect(); } try { @@ -87,8 +97,13 @@ private function runQuery(TimestreamReaderDto $timestreamReader, string $nextTok } $result = $this->reader->query($params); - if ($token = $result->get('NextToken')) { - return $this->runQuery($timestreamReader, $token); + $this->nextToken = $result->get('NextToken'); + if ($this->nextToken !== null) { + $parsedRows = $this->parseQueryResult($result); + $rowsLeft -= $parsedRows->count(); + $params['NextToken'] = $this->nextToken; + // we fetch everything recursively until the limit has been reached or there is no more data + return $this->runQuery($params, $rowsLeft)->merge($parsedRows); } } catch (TimestreamQueryException $e) { throw new FailTimestreamQueryException($e, $params);