Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect obsolete BQ Storage extra at runtime #666

Merged
merged 8 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
Expand Down Expand Up @@ -152,6 +153,8 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Custom exceptions
"LegacyBigQueryStorageError",
]


Expand Down
30 changes: 30 additions & 0 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
import pkg_resources

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError


_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
Expand All @@ -36,6 +40,32 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0")


def _verify_bq_storage_version():
"""Verify that a recent enough version of BigQuery Storage extra is installed.

The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.

Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
compatibility at runtime.
"""
from google.cloud import bigquery_storage

installed_version = pkg_resources.parse_version(
getattr(bigquery_storage, "__version__", "legacy")
)

if installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {installed_version})."
)
raise LegacyBigQueryStorageError(msg)


def _not_null(value, field):
"""Check whether 'value' should be coerced to 'field' type."""
Expand Down
57 changes: 51 additions & 6 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,25 @@
from google.cloud import exceptions # pytype: disable=import-error
from google.cloud.client import ClientWithProject # pytype: disable=import-error

try:
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)
except ImportError:
DEFAULT_BQSTORAGE_CLIENT_INFO = None

from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.job import (
Expand Down Expand Up @@ -445,15 +454,38 @@ def dataset(self, dataset_id: str, project: str = None) -> DatasetReference:
)
return DatasetReference(project, dataset_id)

def _create_bqstorage_client(self):
def _ensure_bqstorage_client(
self,
bqstorage_client: Optional[
"google.cloud.bigquery_storage.BigQueryReadClient"
] = None,
client_options: Optional[google.api_core.client_options.ClientOptions] = None,
client_info: Optional[
"google.api_core.gapic_v1.client_info.ClientInfo"
] = DEFAULT_BQSTORAGE_CLIENT_INFO,
) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
"""Create a BigQuery Storage API client using this client's credentials.

If a client cannot be created due to missing dependencies, raise a
warning and return ``None``.
If a client cannot be created due to a missing or outdated dependency
`google-cloud-bigquery-storage`, raise a warning and return ``None``.

If the `bqstorage_client` argument is not ``None``, still perform the version
check and return the argument back to the caller if the check passes. If it
fails, raise a warning and return ``None``.

Args:
bqstorage_client:
An existing BigQuery Storage client instance to check for version
compatibility. If ``None``, a new instance is created and returned.
client_options:
Custom options used with a new BigQuery Storage client instance if one
is created.
client_info:
The client info used with a new BigQuery Storage client instance if one
is created.

Returns:
Optional[google.cloud.bigquery_storage.BigQueryReadClient]:
A BigQuery Storage API client.
A BigQuery Storage API client.
"""
try:
from google.cloud import bigquery_storage
Expand All @@ -464,7 +496,20 @@ def _create_bqstorage_client(self):
)
return None

return bigquery_storage.BigQueryReadClient(credentials=self._credentials)
try:
_verify_bq_storage_version()
plamut marked this conversation as resolved.
Show resolved Hide resolved
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None

if bqstorage_client is None:
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=self._credentials,
client_options=client_options,
client_info=client_info,
)

return bqstorage_client

def _dataset_from_arg(self, dataset):
if isinstance(dataset, str):
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/bigquery/dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ def __init__(self, client=None, bqstorage_client=None):
else:
self._owns_client = False

# A warning is already raised by the BQ Storage client factory factory if
# instantiation fails, or if the given BQ Storage client instance is outdated.
if bqstorage_client is None:
# A warning is already raised by the factory if instantiation fails.
bqstorage_client = client._create_bqstorage_client()
bqstorage_client = client._ensure_bqstorage_client()
self._owns_bqstorage_client = bqstorage_client is not None
else:
self._owns_bqstorage_client = False
bqstorage_client = client._ensure_bqstorage_client(bqstorage_client)

self._client = client
self._bqstorage_client = bqstorage_client
Expand Down
21 changes: 21 additions & 0 deletions google/cloud/bigquery/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class BigQueryError(Exception):
"""Base class for all custom exceptions defined by the BigQuery client."""


class LegacyBigQueryStorageError(BigQueryError):
"""Raised when too old a version of BigQuery Storage extra is detected at runtime."""
11 changes: 5 additions & 6 deletions google/cloud/bigquery/magics/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def _cell_magic(line, query):
bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint

bqstorage_client = _make_bqstorage_client(
use_bqstorage_api, context.credentials, bqstorage_client_options,
client, use_bqstorage_api, bqstorage_client_options,
)

close_transports = functools.partial(_close_transports, client, bqstorage_client)
Expand Down Expand Up @@ -762,12 +762,12 @@ def _split_args_line(line):
return params_option_value, rest_of_args


def _make_bqstorage_client(use_bqstorage_api, credentials, client_options):
def _make_bqstorage_client(client, use_bqstorage_api, client_options):
if not use_bqstorage_api:
return None

try:
from google.cloud import bigquery_storage
from google.cloud import bigquery_storage # noqa: F401
except ImportError as err:
customized_error = ImportError(
"The default BigQuery Storage API client cannot be used, install "
Expand All @@ -785,10 +785,9 @@ def _make_bqstorage_client(use_bqstorage_api, credentials, client_options):
)
raise customized_error from err

return bigquery_storage.BigQueryReadClient(
credentials=credentials,
client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
return client._ensure_bqstorage_client(
client_options=client_options,
client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
)


Expand Down
14 changes: 13 additions & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
Expand Down Expand Up @@ -1519,6 +1520,17 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
)
return False

try:
from google.cloud import bigquery_storage # noqa: F401
except ImportError:
return False

try:
_helpers._verify_bq_storage_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False

return True

def _get_next_page_response(self):
Expand Down Expand Up @@ -1655,7 +1667,7 @@ def to_arrow(

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
bqstorage_client = self.client._create_bqstorage_client()
bqstorage_client = self.client._ensure_bqstorage_client()
owns_bqstorage_client = bqstorage_client is not None

try:
Expand Down
38 changes: 38 additions & 0 deletions tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@

import mock

try:
from google.cloud import bigquery_storage
except ImportError: # pragma: NO COVER
bigquery_storage = None


@unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`")
class Test_verify_bq_storage_version(unittest.TestCase):
def _call_fut(self):
from google.cloud.bigquery._helpers import _verify_bq_storage_version

return _verify_bq_storage_version()

def test_raises_no_error_w_recent_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="2.0.0"):
try:
self._call_fut()
except LegacyBigQueryStorageError: # pragma: NO COVER
self.fail("Legacy error raised with a non-legacy dependency version.")

def test_raises_error_w_legacy_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="1.9.9"):
with self.assertRaises(LegacyBigQueryStorageError):
self._call_fut()

def test_raises_error_w_unknown_bqstorage_version(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage", autospec=True) as fake_module:
del fake_module.__version__
error_pattern = r"version found: legacy"
with self.assertRaisesRegex(LegacyBigQueryStorageError, error_pattern):
self._call_fut()


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Expand Down
Loading