Skip to content

Commit

Permalink
feat: add bulk writer (#396)
Browse files Browse the repository at this point in the history
* feat: bulk writer 555 rate_limiter (#368)

* added 555 throttle utility

* Update google/cloud/firestore_v1/throttle.py

Co-authored-by: Tres Seaver <tseaver@palladion.com>

* added ability to request a number of tokens

* replaced Callable now parameter with module function

* updated tests

* renamed throttle -> ramp up

* improved docstrings

* linting

* fixed test coverage

* rename to RateLimiter and defer clock to first op

* linting

Co-authored-by: Tres Seaver <tseaver@palladion.com>

* feat: added new batch class for BulkWriter (#397)

* feat: added new batch class for BulkWriter

* updated docstring to use less colloquial language

* feat: BulkWriter implementation (#384)

* feat: added `write` method to batch classes

* added docstrings to all 3 batch classes

instead of just the base

* updated batch classes to remove control flag

now branches logic via subclasses

* fixed broken tests off abstract class

* fixed docstring

* refactored BulkWriteBatch

this commit increases the distance between WriteBatch and BulkWriteBatch

* began adding [Async]BulkWriter

* continued implementation

* working impl or BW

* tidied up BW impl

* beginning of unit tests for BW

* fixed merge problem

* initial set of BW unit tests

* refactored bulkwriter sending mechanism

now consumes off the queue and schedules on the main thread, only going async to actually send

* final CI touch ups

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/master/packages/owl-bot/README.md

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/master/packages/owl-bot/README.md

* moved BulkWriter parameters to options format

* rebased off master

* test fixes

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* feat: add retry support for BulkWriter errors (#413)

* parent 0176cc7
author Craig Labenz <craig.labenz@gmail.com> 1623693904 -0700
committer Craig Labenz <craig.labenz@gmail.com> 1628617523 -0400

feat: add retries to bulk-writer

* fixed rebase error

Co-authored-by: Tres Seaver <tseaver@palladion.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 11, 2021
1 parent eb45a36 commit 98a7753
Show file tree
Hide file tree
Showing 18 changed files with 2,325 additions and 16 deletions.
13 changes: 13 additions & 0 deletions google/cloud/firestore_v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ def __init__(
client_options=client_options,
)

def _to_sync_copy(self):
from google.cloud.firestore_v1.client import Client

if not getattr(self, "_sync_copy", None):
self._sync_copy = Client(
project=self.project,
credentials=self._credentials,
database=self._database,
client_info=self._client_info,
client_options=self._client_options,
)
return self._sync_copy

@property
def _firestore_api(self):
"""Lazy-loading getter GAPIC Firestore API.
Expand Down
44 changes: 33 additions & 11 deletions google/cloud/firestore_v1/base_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

"""Helpers for batch requests to the Google Cloud Firestore API."""


from google.cloud.firestore_v1 import _helpers
import abc
from typing import Dict, Union

# Types needed only for Type Hints
from google.cloud.firestore_v1.document import DocumentReference

from typing import Union
from google.api_core import retry as retries # type: ignore
from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.base_document import BaseDocumentReference


class BaseWriteBatch(object):
class BaseBatch(metaclass=abc.ABCMeta):
"""Accumulate write operations to be sent in a batch.
This has the same set of methods for write operations that
Expand All @@ -38,9 +38,16 @@ class BaseWriteBatch(object):
def __init__(self, client) -> None:
self._client = client
self._write_pbs = []
self._document_references: Dict[str, BaseDocumentReference] = {}
self.write_results = None
self.commit_time = None

def __len__(self):
return len(self._document_references)

def __contains__(self, reference: BaseDocumentReference):
return reference._document_path in self._document_references

def _add_write_pbs(self, write_pbs: list) -> None:
"""Add `Write`` protobufs to this transaction.
Expand All @@ -52,7 +59,13 @@ def _add_write_pbs(self, write_pbs: list) -> None:
"""
self._write_pbs.extend(write_pbs)

def create(self, reference: DocumentReference, document_data: dict) -> None:
@abc.abstractmethod
def commit(self):
"""Sends all accumulated write operations to the server. The details of this
write depend on the implementing class."""
raise NotImplementedError()

def create(self, reference: BaseDocumentReference, document_data: dict) -> None:
"""Add a "change" to this batch to create a document.
If the document given by ``reference`` already exists, then this
Expand All @@ -65,11 +78,12 @@ def create(self, reference: DocumentReference, document_data: dict) -> None:
creating a document.
"""
write_pbs = _helpers.pbs_for_create(reference._document_path, document_data)
self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def set(
self,
reference: DocumentReference,
reference: BaseDocumentReference,
document_data: dict,
merge: Union[bool, list] = False,
) -> None:
Expand Down Expand Up @@ -98,11 +112,12 @@ def set(
reference._document_path, document_data
)

self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def update(
self,
reference: DocumentReference,
reference: BaseDocumentReference,
field_updates: dict,
option: _helpers.WriteOption = None,
) -> None:
Expand All @@ -126,10 +141,11 @@ def update(
write_pbs = _helpers.pbs_for_update(
reference._document_path, field_updates, option
)
self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def delete(
self, reference: DocumentReference, option: _helpers.WriteOption = None
self, reference: BaseDocumentReference, option: _helpers.WriteOption = None
) -> None:
"""Add a "change" to delete a document.
Expand All @@ -146,9 +162,15 @@ def delete(
state of the document before applying changes.
"""
write_pb = _helpers.pb_for_delete(reference._document_path, option)
self._document_references[reference._document_path] = reference
self._add_write_pbs([write_pb])

def _prep_commit(self, retry, timeout):

class BaseWriteBatch(BaseBatch):
"""Base class for a/sync implementations of the `commit` RPC. `commit` is useful
for lower volumes or when the order of write operations is important."""

def _prep_commit(self, retry: retries.Retry, timeout: float):
"""Shared setup for async/sync :meth:`commit`."""
request = {
"database": self._client._database_string,
Expand Down
20 changes: 19 additions & 1 deletion google/cloud/firestore_v1/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
from google.cloud.firestore_v1 import __version__
from google.cloud.firestore_v1 import types
from google.cloud.firestore_v1.base_document import DocumentSnapshot

from google.cloud.firestore_v1.bulk_writer import (
BulkWriter,
BulkWriterOptions,
)
from google.cloud.firestore_v1.field_path import render_field_path
from typing import (
Any,
Expand Down Expand Up @@ -278,6 +281,21 @@ def _get_collection_reference(self, collection_id: str) -> BaseCollectionReferen
def document(self, *document_path) -> BaseDocumentReference:
raise NotImplementedError

def bulk_writer(self, options: Optional[BulkWriterOptions] = None) -> BulkWriter:
"""Get a BulkWriter instance from this client.
Args:
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriterOptions`:
Optional control parameters for the
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter` returned.
Returns:
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`:
A utility to efficiently create and save many `WriteBatch` instances
to the server.
"""
return BulkWriter(client=self, options=options)

def _document_path_helper(self, *document_path) -> List[str]:
"""Standardize the format of path to tuple of path segments and strip the database string from path if present.
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/firestore_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@


class WriteBatch(BaseWriteBatch):
"""Accumulate write operations to be sent in a batch.
"""Accumulate write operations to be sent in a batch. Use this over
`BulkWriteBatch` for lower volumes or when the order of operations
within a given batch is important.
This has the same set of methods for write operations that
:class:`~google.cloud.firestore_v1.document.DocumentReference` does,
Expand Down
89 changes: 89 additions & 0 deletions google/cloud/firestore_v1/bulk_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2021 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for batch requests to the Google Cloud Firestore API."""
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.base_batch import BaseBatch
from google.cloud.firestore_v1.types.firestore import BatchWriteResponse


class BulkWriteBatch(BaseBatch):
"""Accumulate write operations to be sent in a batch. Use this over
`WriteBatch` for higher volumes (e.g., via `BulkWriter`) and when the order
of operations within a given batch is unimportant.
Because the order in which individual write operations are applied to the database
is not guaranteed, `batch_write` RPCs can never contain multiple operations
to the same document. If calling code detects a second write operation to a
known document reference, it should first cut off the previous batch and
send it, then create a new batch starting with the latest write operation.
In practice, the [Async]BulkWriter classes handle this.
This has the same set of methods for write operations that
:class:`~google.cloud.firestore_v1.document.DocumentReference` does,
e.g. :meth:`~google.cloud.firestore_v1.document.DocumentReference.create`.
Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
The client that created this batch.
"""

def __init__(self, client) -> None:
super(BulkWriteBatch, self).__init__(client=client)

def commit(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None
) -> BatchWriteResponse:
"""Writes the changes accumulated in this batch.
Write operations are not guaranteed to be applied in order and must not
contain multiple writes to any given document. Preferred over `commit`
for performance reasons if these conditions are acceptable.
Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
:class:`google.cloud.proto.firestore.v1.write.BatchWriteResponse`:
Container holding the write results corresponding to the changes
committed, returned in the same order as the changes were applied to
this batch. An individual write result contains an ``update_time``
field.
"""
request, kwargs = self._prep_commit(retry, timeout)

_api = self._client._firestore_api
save_response: BatchWriteResponse = _api.batch_write(
request=request, metadata=self._client._rpc_metadata, **kwargs,
)

self._write_pbs = []
self.write_results = list(save_response.write_results)

return save_response

def _prep_commit(self, retry: retries.Retry, timeout: float):
request = {
"database": self._client._database_string,
"writes": self._write_pbs,
"labels": None,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
return request, kwargs
Loading

0 comments on commit 98a7753

Please sign in to comment.