Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add timeout parameter to QueryJob.done() method #9875

Merged
merged 6 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False):
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
):
"""Get the query results object for a query job.

Expand All @@ -1096,6 +1096,9 @@ def _get_query_results(
(Optional) number of milliseconds the the API call should
wait for the query to complete before the request times out.
location (str): Location of the query job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.

Returns:
google.cloud.bigquery.query._QueryResults:
Expand All @@ -1122,7 +1125,7 @@ def _get_query_results(
# job is complete (from QueryJob.done(), called ultimately from
# QueryJob.result()). So we don't need to poll here.
resource = self._call_api(
retry, method="GET", path=path, query_params=extra_params
retry, method="GET", path=path, query_params=extra_params, timeout=timeout
)
return _QueryResults.from_api_repr(resource)

Expand Down
52 changes: 44 additions & 8 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

"""Define API Jobs."""

from __future__ import division

import concurrent.futures
import copy
import re
import threading

import requests
import six
from six.moves import http_client

Expand Down Expand Up @@ -50,6 +54,7 @@
_DONE_STATE = "DONE"
_STOPPED_REASON = "stopped"
_TIMEOUT_BUFFER_SECS = 0.1
_SERVER_TIMEOUT_MARGIN_SECS = 1.0
_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)

_ERROR_REASON_TO_EXCEPTION = {
Expand Down Expand Up @@ -663,7 +668,7 @@ def exists(self, client=None, retry=DEFAULT_RETRY):
else:
return True

def reload(self, client=None, retry=DEFAULT_RETRY):
def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None):
"""API call: refresh job properties via a GET request.

See
Expand All @@ -675,6 +680,9 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
``client`` stored on the current dataset.

retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
"""
client = self._require_client(client)

Expand All @@ -683,7 +691,11 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
extra_params["location"] = self.location

api_response = client._call_api(
retry, method="GET", path=self.path, query_params=extra_params
retry,
method="GET",
path=self.path,
query_params=extra_params,
timeout=timeout,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -2994,9 +3006,16 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY):
def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.

Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.

Returns:
bool: True if the job is complete, False otherwise.
"""
Expand All @@ -3007,11 +3026,25 @@ def done(self, retry=DEFAULT_RETRY):
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
timeout = max(min(timeout, 10), 0)
self._done_timeout -= timeout
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(timeout * 1000)
timeout_ms = int(api_timeout * 1000)

# If the server-side processing timeout (timeout_ms) is specified and
# would be picked as the total request timeout, we want to add a small
# margin to it - we don't want to timeout the connection just as the
# server-side processing might have completed, but instead slightly
# after the server-side deadline.
# However, if `timeout` is specified, and is shorter than the adjusted
# server timeout, the former prevails.
if timeout_ms is not None and timeout_ms > 0:
server_timeout_with_margin = timeout_ms / 1000 + _SERVER_TIMEOUT_MARGIN_SECS
if timeout is not None:
timeout = min(server_timeout_with_margin, timeout)
else:
timeout = server_timeout_with_margin

# Do not refresh is the state is already done, as the job will not
# change once complete.
Expand All @@ -3022,13 +3055,14 @@ def done(self, retry=DEFAULT_RETRY):
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry)
self.reload(retry=retry, timeout=timeout)

return self.state == _DONE_STATE

Expand Down Expand Up @@ -3132,6 +3166,8 @@ def result(
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
raise
except requests.exceptions.Timeout as exc:
six.raise_from(concurrent.futures.TimeoutError, exc)

# If the query job is complete but there are no query results, this was
# special job, such as a DDL query. Return an empty result set to
Expand Down
2 changes: 2 additions & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
'enum34; python_version < "3.4"',
"google-auth >= 1.9.0, < 2.0dev",
"google-api-core >= 1.15.0, < 2.0dev",
"google-cloud-core >= 1.0.3, < 2.0dev",
"google-resumable-media >= 0.3.1, != 0.4.0, < 0.6.0dev",
"protobuf >= 3.6.0",
Expand Down
24 changes: 24 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import uuid
import re

import requests
import six
import psutil
import pytest
Expand Down Expand Up @@ -1893,6 +1894,29 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

def test_querying_data_w_timeout(self):
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = False

query_job = Config.CLIENT.query(
"""
SELECT name, SUM(number) AS total_people
FROM `bigquery-public-data.usa_names.usa_1910_current`
GROUP BY name
""",
location="US",
job_config=job_config,
)

# Specify a very tight deadline to demonstrate that the timeout
# actually has effect.
with self.assertRaises(requests.exceptions.Timeout):
query_job.done(timeout=0.1)

# Now wait for the result using a more realistic deadline.
query_job.result(timeout=30)
self.assertTrue(query_job.done(timeout=30))

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_query_results_to_dataframe(self):
QUERY = """
Expand Down
3 changes: 3 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,14 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
project="other-project",
location=self.LOCATION,
timeout_ms=500,
timeout=42,
)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/other-project/queries/nothere",
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
)

def test__get_query_results_miss_w_client_location(self):
Expand All @@ -248,6 +250,7 @@ def test__get_query_results_miss_w_client_location(self):
method="GET",
path="/projects/PROJECT/queries/nothere",
query_params={"maxResults": 0, "location": self.LOCATION},
timeout=None,
)

def test__get_query_results_hit(self):
Expand Down
Loading