Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing features #24

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 92 additions & 14 deletions src/Builder/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,43 @@
use Closure;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Support\Stringable;
use NorbyBaru\AwsTimestream\Concerns\BuildersConcern;
use NorbyBaru\AwsTimestream\Contract\QueryBuilderContract;

abstract class Builder implements QueryBuilderContract
{
use BuildersConcern;

public const JOIN_TYPE_LEFT = 'LEFT';
public const JOIN_TYPE_RIGHT = 'RIGHT';
public const JOIN_TYPE_INNER = 'INNER';
public const JOIN_TYPE_FULL = 'FULL';
public const JOIN_TYPE_LEFT_OUTER = 'LEFT OUTER';
public const JOIN_TYPE_RIGHT_OUTER = 'RIGHT OUTER';
public const JOIN_TYPE_FULL_OUTER = 'FULL OUTER';

public const JOIN_TYPES = [
self::JOIN_TYPE_LEFT,
self::JOIN_TYPE_RIGHT,
self::JOIN_TYPE_INNER,
self::JOIN_TYPE_FULL,
self::JOIN_TYPE_LEFT_OUTER,
self::JOIN_TYPE_RIGHT_OUTER,
self::JOIN_TYPE_FULL_OUTER,
];

protected string $database = '';
protected string $table = '';
protected string $fromQuery = '';
protected string $whereQuery = '';
protected string $havingQuery = '';
protected string $selectStatement = '';
protected string $orderByQuery = '';
protected string $groupByQuery = '';
protected string $limitByQuery = '';
protected array $withQueries = [];
protected array $joinQueries = [];

public function selectRaw(string $statement): self
{
Expand All @@ -44,12 +65,35 @@ public function from(string $database, string $table, string $alias = null): sel
$this->fromQuery = 'FROM "' . $database . '"."' . $table . '"';

if ($alias) {
$this->fromQuery = Str::of($this->fromQuery)->append(" {$alias}");
$this->fromQuery = Str::of($this->fromQuery)->append(" AS {$alias}");
}

return $this;
}

public function join(string $database, string $table, string $type = 'LEFT', ?string $alias = null, ?string $on = null): Builder
{
if (!in_array($type, self::JOIN_TYPES)) {
throw new \InvalidArgumentException(sprintf('Invalid join type %s', $type));
}
$joinQuery = $type . ' JOIN "' . $database . '"."' . $table . '"';
if ($alias) {
$joinQuery = Str::of($joinQuery)->append(" AS {$alias}");
}

if ($on) {
$joinQuery = Str::of($joinQuery)->append(" ON {$on}");
}
$this->joinQueries = array_merge($this->joinQueries, [$joinQuery]);

return $this;
}

public function leftJoin(string $database, string $table, string $alias = null, string $on = null): self
{
return $this->join($database, $table, self::JOIN_TYPE_LEFT, $alias, $on);
}

public function fromRaw(string $statement): self
{
$this->fromQuery = $statement;
Expand All @@ -72,44 +116,63 @@ public function groupBy($args): self
return $this;
}

public function whereRaw(string $statement): self
{
$this->whereQuery = $statement;

return $this;
}

public function where(string $column, $value, string $operator = '=', string $boolean = 'and', bool $ago = false): self
{
$query = Str::of($this->whereQuery);
$this->whereQuery = $this->modifyQueryPart('WHERE', $query, $column, $value, $operator, $boolean, $ago);

return $this;
}

protected function modifyQueryPart(
string $sqlPart,
Stringable $query,
string $column,
$value,
string $operator = '=',
string $boolean = 'and',
bool $ago = false
): Stringable {
if (!in_array($sqlPart, ['WHERE', 'HAVING'])) {
throw new \InvalidArgumentException(sprintf('Invalid sql part %s', $sqlPart));
}
$value = $value instanceof Closure
// If the value is a Closure, it means the developer is performing an entire
? '(' . call_user_func($value) . ')'
: $value;

if ($query->length() == 0) {
$whereQuery = $query->append(
sprintf('WHERE %s %s %s', $column, $operator, $value)
$queryPart = $query->append(
sprintf($sqlPart . ' %s %s %s', $column, $operator, $value)
);

if ($ago) {
$whereQuery = $query->append(
sprintf('WHERE %s %s ago(%s)', $column, $operator, $value)
$queryPart = $query->append(
sprintf($sqlPart . ' %s %s ago(%s)', $column, $operator, $value)
);
}

$this->whereQuery = $whereQuery;

return $this;
return $queryPart;
}

$whereQuery = $query->append(
$queryPart = $query->append(
sprintf(' %s %s %s %s', mb_strtoupper($boolean), $column, $operator, $value)
);

if ($ago) {
$whereQuery = $query->append(
$queryPart = $query->append(
sprintf(' %s %s %s ago(%s)', mb_strtoupper($boolean), $column, $operator, $value)
);
}

$this->whereQuery = $whereQuery;

return $this;
return $queryPart;
}

public function whereAgo(string $column, $value, string $operator = '=', string $boolean = 'and'): self
Expand Down Expand Up @@ -251,9 +314,24 @@ public function whereNotNull(string|array $columns, $boolean = 'and'): self
return $this->whereNull($columns, $boolean, true);
}

public function havingRaw(string $statement): self
{
$this->havingQuery = $statement;

return $this;
}

public function having(string $column, $value, string $operator = '=', string $boolean = 'and', bool $ago = false): self
{
$query = Str::of($this->havingQuery);
$this->havingQuery = $this->modifyQueryPart('HAVING', $query, $column, $value, $operator, $boolean, $ago);

return $this;
}

public function limitBy(int $limit): self
{
$this->limitByQuery = sprintf('LIMIT %s ', $limit);
$this->limitByQuery = sprintf('LIMIT %s', $limit);

return $this;
}
Expand Down
23 changes: 23 additions & 0 deletions src/Concerns/BuildersConcern.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public function getOrderByQuery(): string
return $this->orderByQuery;
}

public function getHavingQuery(): string
{
return $this->havingQuery;
}

public function getGroupByQuery(): string
{
return $this->groupByQuery;
Expand All @@ -67,6 +72,11 @@ public function getWithQueries(): array
return $this->withQueries;
}

public function getJoinQueries(): array
{
return $this->joinQueries;
}

public function getQueryString(): string
{
if ($this->getWithQueries()) {
Expand All @@ -84,6 +94,13 @@ public function getQueryString(): string
->append($this->getFromQuery());
}

if ($this->getJoinQueries()) {
$joinQueries = implode(' ', $this->getJoinQueries());
$queryString = $queryString
->append(' ')
->append($joinQueries);
}

if ($this->getWhereQuery()) {
$queryString = $queryString
->append(' ')
Expand All @@ -96,6 +113,12 @@ public function getQueryString(): string
->append($this->getGroupByQuery());
}

if ($this->getHavingQuery()) {
$queryString = $queryString
->append(' ')
->append($this->getHavingQuery());
}

if ($this->getOrderByQuery()) {
$queryString = $queryString
->append(' ')
Expand Down
41 changes: 40 additions & 1 deletion src/Dto/TimestreamReaderDto.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

final class TimestreamReaderDto extends AbstractTimestreamDto
{
private ?int $maxRows = null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the maxRows necessary? I still don't see how much useful this one is! can give an example for it?

My understanding is we can just rely on the nextToken. If the query should paginate then they can make use of Limit in the query to limit max results per page I think


private string $nextTokenToContinueReading = '';
Copy link
Owner

@norbybaru norbybaru Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to find where this is being used as well!

In the loop you are always calling runQuery($params) which will have the nextToken already available in $params


public function __construct(protected Builder $builder, string $forTable = null)
{
$this->database = config('timestream.database');
Expand Down Expand Up @@ -37,8 +41,43 @@ protected function getQueryString(): string

public function toArray(): array
{
return [
$params = [
'QueryString' => $this->getQueryString(),
];

if ($this->maxRows) {
$params['MaxRows'] = $this->maxRows;
}

// we can pass an initial next token to proceed previous queries
if ($this->nextTokenToContinueReading !== '') {
$params['NextToken'] = $this->nextTokenToContinueReading;
}

return $params;
}

/**
* @param int|null $maxRows
*
* @return TimestreamReaderDto
*/
public function setMaximumRowLimit(?int $maxRows): TimestreamReaderDto
{
$this->maxRows = $maxRows;

return $this;
}

/**
* @param string $nextTokenToContinueReading
*
* @return TimestreamReaderDto
*/
public function setNextTokenToContinueReading(string $nextTokenToContinueReading): TimestreamReaderDto
{
$this->nextTokenToContinueReading = $nextTokenToContinueReading;

return $this;
}
}
30 changes: 23 additions & 7 deletions src/TimestreamService.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,18 @@ class TimestreamService
{
public TimestreamQueryClient $reader;

/**
* @return string|null
*/
public function getNextToken(): ?string
{
return $this->nextToken;
}

public TimestreamWriteClient $writer;

private ?string $nextToken = null;

public function __construct(TimestreamManager $manager)
{
$this->reader = $manager->getReader();
Expand Down Expand Up @@ -71,14 +81,15 @@ private function ingest(array $payload): \Aws\Result

public function query(TimestreamReaderDto $timestreamReader): Collection
{
return $this->runQuery($timestreamReader);
$params = $timestreamReader->toArray();

return $this->runQuery($params, $params['MaxRows'] ?? PHP_INT_MAX);
}

private function runQuery(TimestreamReaderDto $timestreamReader, string $nextToken = null): Collection
private function runQuery($params, int $rowsLeft): Collection
{
$params = $timestreamReader->toArray();
if ($nextToken) {
$params['NextToken'] = $nextToken;
if ($rowsLeft <= 0) {
return collect();
}

try {
Expand All @@ -87,8 +98,13 @@ private function runQuery(TimestreamReaderDto $timestreamReader, string $nextTok
}

$result = $this->reader->query($params);
if ($token = $result->get('NextToken')) {
return $this->runQuery($timestreamReader, $token);
$this->nextToken = $result->get('NextToken');
if ($this->nextToken !== null) {
$parsedRows = $this->parseQueryResult($result);
$rowsLeft -= $parsedRows->count();
$params['NextToken'] = $this->nextToken;
// we fetch everything recursively until the limit has been reached or there is no more data
return $this->runQuery($params, $rowsLeft)->merge($parsedRows);
}
} catch (TimestreamQueryException $e) {
throw new FailTimestreamQueryException($e, $params);
Expand Down
Loading
Loading