Skip to content

Commit

Permalink
feat: Support the Count aggregation query (#673)
Browse files Browse the repository at this point in the history
* feat: Support the Count aggregation query

* Fix docs build

* Add test coverage for calling count from Query.

* Fix the test.

* Add aggregation doc and update docstrings.

* Add the aggregation.rst file

* Test that the aggregation alias is unique
Test in transaction

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Type annotation fix

* Remove unneeded variable
Refactor system test suite to fallback to default creds

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
Mariatta and gcf-owl-bot[bot] authored Jan 12, 2023
1 parent 24c3848 commit dfd4c5d
Show file tree
Hide file tree
Showing 27 changed files with 2,055 additions and 182 deletions.
14 changes: 14 additions & 0 deletions docs/aggregation.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Aggregation
~~~~~~~~~~~

.. automodule:: google.cloud.firestore_v1.aggregation
:members:
:show-inheritance:

.. automodule:: google.cloud.firestore_v1.base_aggregation
:members:
:show-inheritance:

.. automodule:: google.cloud.firestore_v1.async_aggregation
:members:
:show-inheritance:
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ API Reference

client
collection
aggregation
document
field_path
query
Expand Down
156 changes: 156 additions & 0 deletions google/cloud/firestore_v1/aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes for representing aggregation queries for the Google Cloud Firestore API.
A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create an aggregation query than direct usage of the constructor.
"""
from __future__ import annotations

from google.api_core import exceptions
from google.api_core import gapic_v1
from google.api_core import retry as retries


from google.cloud.firestore_v1.base_aggregation import (
AggregationResult,
BaseAggregationQuery,
_query_response_to_result,
)

from typing import Generator, Union, List, Any


class AggregationQuery(BaseAggregationQuery):
"""Represents an aggregation query to the Firestore API."""

def __init__(
self,
nested_query,
) -> None:
super(AggregationQuery, self).__init__(nested_query)

def get(
self,
transaction=None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> List[AggregationResult]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
list: The aggregation query results
"""
result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
return list(result) # type: ignore

def _get_stream_iterator(self, transaction, retry, timeout):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
)

return self._client._firestore_api.run_aggregation_query(
request=request,
metadata=self._client._rpc_metadata,
**kwargs,
)

def _retry_query_after_exception(self, exc, retry, transaction):
"""Helper method for :meth:`stream`."""
if transaction is None: # no snapshot-based retry inside transaction
if retry is gapic_v1.method.DEFAULT:
transport = self._client._firestore_api._transport
gapic_callable = transport.run_aggregation_query
retry = gapic_callable._retry
return retry._predicate(exc)

return False

def stream(
self,
transaction=None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> Union[Generator[List[AggregationResult], Any, None]]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
consumes each document returned in the stream of ``RunAggregationQueryResponse``
messages.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
The result of aggregations of this query
"""

response_iterator = self._get_stream_iterator(
transaction,
retry,
timeout,
)
while True:
try:
response = next(response_iterator, None)
except exceptions.GoogleAPICallError as exc:
if self._retry_query_after_exception(exc, retry, transaction):
response_iterator = self._get_stream_iterator(
transaction,
retry,
timeout,
)
continue
else:
raise

if response is None: # EOI
break
result = _query_response_to_result(response)
yield result
124 changes: 124 additions & 0 deletions google/cloud/firestore_v1/async_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes for representing Async aggregation queries for the Google Cloud Firestore API.
A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from
a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be
a more common way to create an aggregation query than direct usage of the constructor.
"""
from __future__ import annotations

from google.api_core import gapic_v1
from google.api_core import retry as retries

from typing import List, Union, AsyncGenerator


from google.cloud.firestore_v1.base_aggregation import (
AggregationResult,
_query_response_to_result,
BaseAggregationQuery,
)


class AsyncAggregationQuery(BaseAggregationQuery):
"""Represents an aggregation query to the Firestore API."""

def __init__(
self,
nested_query,
) -> None:
super(AsyncAggregationQuery, self).__init__(nested_query)

async def get(
self,
transaction=None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> List[AggregationResult]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
list: The aggregation query results
"""
stream_result = self.stream(
transaction=transaction, retry=retry, timeout=timeout
)
result = [aggregation async for aggregation in stream_result]
return result # type: ignore

async def stream(
self,
transaction=None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
) -> Union[AsyncGenerator[List[AggregationResult], None]]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
consumes each document returned in the stream of ``RunAggregationQueryResponse``
messages.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
The result of aggregations of this query
"""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
)

response_iterator = await self._client._firestore_api.run_aggregation_query(
request=request,
metadata=self._client._rpc_metadata,
**kwargs,
)

async for response in response_iterator:
result = _query_response_to_result(response)
yield result
13 changes: 9 additions & 4 deletions google/cloud/firestore_v1/async_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
BaseCollectionReference,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import (
async_query,
async_document,
)
from google.cloud.firestore_v1 import async_query, async_document, async_aggregation

from google.cloud.firestore_v1.document import DocumentReference

Expand Down Expand Up @@ -72,6 +69,14 @@ def _query(self) -> async_query.AsyncQuery:
"""
return async_query.AsyncQuery(self)

def _aggregation_query(self) -> async_aggregation.AsyncAggregationQuery:
"""AsyncAggregationQuery factory.
Returns:
:class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery
"""
return async_aggregation.AsyncAggregationQuery(self._query())

async def _chunkify(self, chunk_size: int):
async for page in self._query()._chunkify(chunk_size):
yield page
Expand Down
18 changes: 18 additions & 0 deletions google/cloud/firestore_v1/async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create a query than direct usage of the constructor.
"""
from __future__ import annotations

from google.api_core import gapic_v1
from google.api_core import retry as retries
Expand All @@ -39,6 +40,8 @@
# Types needed only for Type Hints
from google.cloud.firestore_v1.transaction import Transaction

from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery


class AsyncQuery(BaseQuery):
"""Represents a query to the Firestore API.
Expand Down Expand Up @@ -213,6 +216,21 @@ async def get(

return result

def count(
self, alias: str | None = None
) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
"""Adds a count over the nested query.
Args:
alias
(Optional[str]): The alias for the count
Returns:
:class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
An instance of an AsyncAggregationQuery object
"""
return AsyncAggregationQuery(self).count(alias=alias)

async def stream(
self,
transaction=None,
Expand Down
Loading

0 comments on commit dfd4c5d

Please sign in to comment.