Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry failed query jobs in result() #837

Merged
merged 32 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cc62448
initial stab
jimfulton Jul 28, 2021
61e1c38
test start
jimfulton Jul 28, 2021
093f9b4
Test high-level retry behavior.
jimfulton Jul 28, 2021
b2ce74b
Don't retry here
jimfulton Jul 28, 2021
d930c83
reworked the retry logic.
jimfulton Jul 28, 2021
bf7b4c6
if, when retrying, the new query job is complete and errorer, stop ri…
jimfulton Jul 29, 2021
0598197
Added test (and to existing test) to make sure we can call result mul…
jimfulton Jul 29, 2021
c9644e9
Keep carrying retry_do_query, even though it shouldn't be necessary.
jimfulton Jul 29, 2021
2d0e067
blacken
jimfulton Jul 29, 2021
0dcac01
removed unecessary condition
jimfulton Jul 29, 2021
026edba
System test that demonstrates the retry behavior as applied to the or…
jimfulton Jul 29, 2021
0e764d2
Added missing copyright
jimfulton Jul 29, 2021
c96d8b3
Added missing copyright
jimfulton Jul 29, 2021
5058cb4
Merge branch 'master' into fix-retry
tswast Aug 4, 2021
3c53172
Added a leading _ to the retry_do_query query-jov attribute to make i…
jimfulton Aug 4, 2021
eb51432
Merge branch 'master' into fix-retry
tswast Aug 5, 2021
d6d958f
fixed copyright
jimfulton Aug 5, 2021
a05f75f
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 5, 2021
32e7050
why sleep?
jimfulton Aug 5, 2021
3361a82
use DEFAULT_RETRY for low-level requests, to retry API errors
jimfulton Aug 5, 2021
4c6ef5b
Separate job retry into separate option for client query and query-jo…
jimfulton Aug 9, 2021
25b44a0
Merge branch 'master' into fix-retry
jimfulton Aug 9, 2021
9ab84e4
Use None job_retry to disable retry
jimfulton Aug 10, 2021
d2bf840
Use a 10-minute deadline for job retry by default
jimfulton Aug 10, 2021
4c004a5
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 10, 2021
be49c4b
Merge branch 'master' into fix-retry
jimfulton Aug 10, 2021
9e4a011
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
204641b
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
bf051b0
Added another default reason to retry jobs
jimfulton Aug 11, 2021
b47244f
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 11, 2021
2377a1b
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
6b790e8
Update tests/unit/test_job_retry.py
tswast Aug 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 81 additions & 29 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.model import _model_arg_to_model_ref
from google.cloud.bigquery.query import _QueryResults
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -3163,6 +3163,7 @@ def query(
project: str = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: float = None,
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
) -> job.QueryJob:
"""Run a SQL query.

Expand Down Expand Up @@ -3192,30 +3193,59 @@ def query(
Project ID of the project of where to run the job. Defaults
to the client's project.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
a reasonable default that should only be overridden
with care.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry.

Not all jobs can be retried. If ``job_id`` is
provided, then the job returned by the query will not
be retryable, and an exception will be raised if a
non-``None`` (and non-default) value for ``job_retry``
is also provided.

Note that errors aren't detected until ``result()`` is
called on the job returned. The ``job_retry``
specified here becomes the default ``job_retry`` for
``result()``, where it can also be specified.

Returns:
google.cloud.bigquery.job.QueryJob: A new query job instance.

Raises:
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class, or if both ``job_id`` and non-``None`` non-default
``job_retry`` are provided.
"""
job_id_given = job_id is not None
job_id = _make_job_id(job_id, job_id_prefix)
if (
job_id_given
and job_retry is not None
and job_retry is not DEFAULT_JOB_RETRY
):
raise TypeError(
"`job_retry` was provided, but the returned job is"
" not retryable, because a custom `job_id` was"
" provided."
)

job_id_save = job_id

if project is None:
project = self.project

if location is None:
location = self.location

job_config = copy.deepcopy(job_config)

if self._default_query_job_config:
if job_config:
_verify_job_config_type(
Expand All @@ -3225,6 +3255,8 @@ def query(
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
#
# Note that _fill_from_default doesn't mutate the receiver
job_config = job_config._fill_from_default(
self._default_query_job_config
)
Expand All @@ -3233,34 +3265,54 @@ def query(
self._default_query_job_config,
google.cloud.bigquery.job.QueryJobConfig,
)
job_config = copy.deepcopy(self._default_query_job_config)
job_config = self._default_query_job_config

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
# Note that we haven't modified the original job_config (or
# _default_query_job_config) up to this point.
job_config_save = job_config

try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc
def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)

job_id = _make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
jimfulton marked this conversation as resolved.
Show resolved Hide resolved

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job
else:
return query_job

future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future

def insert_rows(
self,
Expand Down
84 changes: 74 additions & 10 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
Expand Down Expand Up @@ -1260,6 +1260,7 @@ def result(
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: float = None,
start_index: int = None,
job_retry: "retries.Retry" = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
"""Start the job and wait for it to complete and get the result.

Expand All @@ -1270,16 +1271,30 @@ def result(
max_results (Optional[int]):
The maximum total number of rows from this request.
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves rows. If the job state is
``DONE``, retrying is aborted early even if the results are not
available, as this will not change anymore.
How to retry the call that retrieves rows. This only
applies to making RPC calls. It isn't used to retry
failed jobs. This has a reasonable default that
should only be overridden with care. If the job state
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry.

Not all jobs can be retried. If ``job_id`` was
provided to the query that created this job, then the
job returned by the query will not be retryable, and
an exception will be raised if non-``None``
non-default ``job_retry`` is also provided.

Returns:
google.cloud.bigquery.table.RowIterator:
Expand All @@ -1295,17 +1310,66 @@ def result(

Raises:
google.cloud.exceptions.GoogleAPICallError:
If the job failed.
If the job failed and retries aren't successful.
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
TypeError:
If Non-``None`` and non-default ``job_retry`` is
provided and the job is not retryable.
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
if job_retry is DEFAULT_JOB_RETRY:
job_retry = self._job_retry
else:
if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
raise TypeError(
"`job_retry` was provided, but this job is"
" not retryable, because a custom `job_id` was"
" provided to the query that created this job."
)

first = True
tswast marked this conversation as resolved.
Show resolved Hide resolved

def do_get_result():
nonlocal first

if first:
first = False
else:
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.

# The orinal job is failed. Create a new one.
job = retry_do_query()

# If it's already failed, we might as well stop:
tswast marked this conversation as resolved.
Show resolved Hide resolved
if job.done() and job.exception() is not None:
raise job.exception()

# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)

# This shouldn't be necessary, because once we have a good
# job, it should stay good,and we shouldn't have to retry.
# But let's be paranoid. :)
self._retry_do_query = retry_do_query
self._job_retry = job_retry

super(QueryJob, self).result(retry=retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)

if retry_do_query is not None and job_retry is not None:
do_get_result = job_retry(do_get_result)

do_get_result()

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)
except exceptions.GoogleAPICallError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
auth_exceptions.TransportError,
)

_DEFAULT_JOB_DEADLINE = 60.0 * 10.0 # seconds


def _should_retry(exc):
"""Predicate for determining when to retry.
Expand All @@ -56,3 +58,21 @@ def _should_retry(exc):
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""

job_retry_reasons = "rateLimitExceeded", "backendError"


def _job_should_retry(exc):
if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

reason = exc.errors[0]["reason"]
return reason in job_retry_reasons


DEFAULT_JOB_RETRY = retry.Retry(
predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE
)
"""
The default job retry object.
"""
72 changes: 72 additions & 0 deletions tests/system/test_job_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import threading
import time

import google.api_core.exceptions
import google.cloud.bigquery
import pytest


def thread(func):
thread = threading.Thread(target=func, daemon=True)
thread.start()
return thread


@pytest.mark.parametrize("job_retry_on_query", [True, False])
def test_query_retry_539(bigquery_client, dataset_id, job_retry_on_query):
"""
Test job_retry

See: https://github.com/googleapis/python-bigquery/issues/539
"""
from google.api_core import exceptions
from google.api_core.retry import if_exception_type, Retry

table_name = f"{dataset_id}.t539"

# Without a custom retry, we fail:
with pytest.raises(google.api_core.exceptions.NotFound):
bigquery_client.query(f"select count(*) from {table_name}").result()

retry_notfound = Retry(predicate=if_exception_type(exceptions.NotFound))

job_retry = dict(job_retry=retry_notfound) if job_retry_on_query else {}
job = bigquery_client.query(f"select count(*) from {table_name}", **job_retry)
job_id = job.job_id

# We can already know that the job failed, but we're not supposed
# to find out until we call result, which is where retry happend
assert job.done()
assert job.exception() is not None

@thread
def create_table():
time.sleep(1) # Give the first retry attempt time to fail.
with contextlib.closing(google.cloud.bigquery.Client()) as client:
client.query(f"create table {table_name} (id int64)").result()

job_retry = {} if job_retry_on_query else dict(job_retry=retry_notfound)
[[count]] = list(job.result(**job_retry))
assert count == 0

# The job was retried, and thus got a new job id
assert job.job_id != job_id

# Make sure we don't leave a thread behind:
create_table.join()
bigquery_client.query(f"drop table {table_name}").result()
Loading