Skip to content

Commit

Permalink
feat: retryable resource exhausted handling (#366)
Browse files Browse the repository at this point in the history
BigQuery Storage Read API will start returning retryable
RESOURCE_EXHAUSTED errors in 2022 when certain concurrency limits are
hit, so this PR adds some code to handle them.

Tested with unit tests and system tests. System tests ran successfully
on a test project that intentionally returns retryable
RESOURCE_EXHAUSTED errors.

Co-authored-by: Tim Swast <swast@google.com>
  • Loading branch information
esert-g and tswast authored Jan 12, 2022
1 parent 675d7cf commit 33757d8
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 89 deletions.
20 changes: 11 additions & 9 deletions google/cloud/bigquery_storage_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -108,6 +109,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -122,20 +129,15 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
stream = reader.ReadRowsStream(
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)
stream._reconnect()
return stream


class BigQueryWriteClient(big_query_write.BigQueryWriteClient):
Expand Down
72 changes: 62 additions & 10 deletions google/cloud/bigquery_storage_v1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import collections
import io
import json
import time

try:
import fastavro
except ImportError: # pragma: NO COVER
fastavro = None
import google.api_core.exceptions
import google.rpc.error_details_pb2

try:
import pandas
Expand Down Expand Up @@ -79,16 +81,17 @@ class ReadRowsStream(object):
If the pandas and fastavro libraries are installed, use the
:func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
method to parse all messages into a :class:`pandas.DataFrame`.
This object should not be created directly, but is returned by
other methods in this library.
"""

def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
def __init__(
self, client, name, offset, read_rows_kwargs, retry_delay_callback=None
):
"""Construct a ReadRowsStream.
Args:
wrapped (Iterable[ \
~google.cloud.bigquery_storage.types.ReadRowsResponse \
]):
The ReadRows stream to read.
client ( \
~google.cloud.bigquery_storage_v1.services. \
big_query_read.BigQueryReadClient \
Expand All @@ -106,6 +109,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
read_rows_kwargs (dict):
Keyword arguments to use when reconnecting to a ReadRows
stream.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
ReadRowsStream will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
Iterable[ \
Expand All @@ -116,11 +125,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):

# Make a copy of the read position so that we can update it without
# mutating the original input.
self._wrapped = wrapped
self._client = client
self._name = name
self._offset = offset
self._read_rows_kwargs = read_rows_kwargs
self._retry_delay_callback = retry_delay_callback
self._wrapped = None

def __iter__(self):
"""An iterable of messages.
Expand All @@ -131,9 +141,12 @@ def __iter__(self):
]:
A sequence of row messages.
"""

# Infinite loop to reconnect on reconnectable errors while processing
# the row stream.

if self._wrapped is None:
self._reconnect()

while True:
try:
for message in self._wrapped:
Expand All @@ -152,14 +165,53 @@ def __iter__(self):
except _STREAM_RESUMPTION_EXCEPTIONS:
# Transient error, so reconnect to the stream.
pass
except Exception as exc:
if not self._resource_exhausted_exception_is_retryable(exc):
raise

self._reconnect()

def _reconnect(self):
"""Reconnect to the ReadRows stream using the most recent offset."""
self._wrapped = self._client.read_rows(
read_stream=self._name, offset=self._offset, **self._read_rows_kwargs
)
while True:
try:
self._wrapped = self._client.read_rows(
read_stream=self._name,
offset=self._offset,
**self._read_rows_kwargs
)
break
except Exception as exc:
if not self._resource_exhausted_exception_is_retryable(exc):
raise

def _resource_exhausted_exception_is_retryable(self, exc):
if isinstance(exc, google.api_core.exceptions.ResourceExhausted):
# ResourceExhausted errors are only retried if a valid
# RetryInfo is provided with the error.
#
# TODO: Remove hasattr logic when we require google-api-core >= 2.2.0.
# ResourceExhausted added details/_details in google-api-core 2.2.0.
details = None
if hasattr(exc, "details"):
details = exc.details
elif hasattr(exc, "_details"):
details = exc._details
if details is not None:
for detail in details:
if isinstance(detail, google.rpc.error_details_pb2.RetryInfo):
retry_delay = detail.retry_delay
if retry_delay is not None:
delay = max(
0,
float(retry_delay.seconds)
+ (float(retry_delay.nanos) / 1e9),
)
if self._retry_delay_callback:
self._retry_delay_callback(delay)
time.sleep(delay)
return True
return False

def rows(self, read_session=None):
"""Iterate over all rows in the stream.
Expand Down
20 changes: 11 additions & 9 deletions google/cloud/bigquery_storage_v1beta2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -109,6 +110,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -123,20 +130,15 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
stream = reader.ReadRowsStream(
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)
stream._reconnect()
return stream


class BigQueryWriteClient(big_query_write.BigQueryWriteClient):
Expand Down
Loading

0 comments on commit 33757d8

Please sign in to comment.