Skip to content

Commit

Permalink
Revert "RPC retries (second PR) (googleapis#3324)" (googleapis#3642)
Browse files Browse the repository at this point in the history
This reverts commit 67f4ba4.
  • Loading branch information
Jon Wayne Parrott authored and landrito committed Aug 22, 2017
1 parent 7c2989d commit dca982c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 520 deletions.
169 changes: 0 additions & 169 deletions bigtable/google/cloud/bigtable/retry.py

This file was deleted.

3 changes: 0 additions & 3 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,6 @@ def consume_next(self):

self._validate_chunk(chunk)

if hasattr(self._response_iterator, 'set_start_key'):
self._response_iterator.set_start_key(chunk.row_key)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
Expand Down
101 changes: 69 additions & 32 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import six

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)
from google.cloud.bigtable._generated import (
Expand All @@ -29,26 +30,6 @@
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.gax import RetryOptions, BackoffSettings
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
from grpc import StatusCode

BACKOFF_SETTINGS = BackoffSettings(
initial_retry_delay_millis=10,
retry_delay_multiplier=1.3,
max_retry_delay_millis=30000,
initial_rpc_timeout_millis=25 * 60 * 1000,
rpc_timeout_multiplier=1.0,
max_rpc_timeout_millis=25 * 60 * 1000,
total_timeout_millis=30 * 60 * 1000
)

RETRY_CODES = [
StatusCode.DEADLINE_EXCEEDED,
StatusCode.ABORTED,
StatusCode.INTERNAL,
StatusCode.UNAVAILABLE
]


# Maximum number of mutations in bulk (MutateRowsRequest message):
Expand Down Expand Up @@ -276,7 +257,7 @@ def read_row(self, row_key, filter_=None):
return rows_data.rows[row_key]

def read_rows(self, start_key=None, end_key=None, limit=None,
filter_=None, backoff_settings=None):
filter_=None):
"""Read rows from this table.
:type start_key: bytes
Expand All @@ -303,18 +284,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
"""
request_pb = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit)
client = self._instance._client
if backoff_settings is None:
backoff_settings = BACKOFF_SETTINGS
RETRY_OPTIONS = RetryOptions(
retry_codes=RETRY_CODES,
backoff_settings=backoff_settings
)

retrying_iterator = ReadRowsIterator(client, self.name, start_key,
end_key, filter_, limit,
RETRY_OPTIONS)
return PartialRowsData(retrying_iterator)
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)

def mutate_rows(self, rows):
"""Mutates multiple rows in bulk.
Expand Down Expand Up @@ -383,6 +359,67 @@ def sample_row_keys(self):
return response_iterator


def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
filter_=None, limit=None):
"""Creates a request to read rows in a table.
:type table_name: str
:param table_name: The name of the table to read from.
:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results.
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key_open'] = _to_bytes(end_key)
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if limit is not None:
request_kwargs['rows_limit'] = limit

message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)

if row_key is not None:
message.rows.row_keys.append(_to_bytes(row_key))

if range_kwargs:
message.rows.row_ranges.add(**range_kwargs)

return message


def _mutate_rows_request(table_name, rows):
"""Creates a request to mutate rows in a table.
Expand Down
38 changes: 0 additions & 38 deletions bigtable/tests/retry_test_script.txt

This file was deleted.

Loading

0 comments on commit dca982c

Please sign in to comment.