diff --git a/ci/requirements-2.7.pip b/ci/requirements-2.7.pip index d16b932c8be4f..08240184f2934 100644 --- a/ci/requirements-2.7.pip +++ b/ci/requirements-2.7.pip @@ -1,8 +1,5 @@ blosc -httplib2 -google-api-python-client==1.2 -python-gflags==2.0 -oauth2client==1.5.0 +pandas-gbq pathlib backports.lzma py diff --git a/ci/requirements-3.4.pip b/ci/requirements-3.4.pip index 55986a0220bf0..4e5fe52d56cf1 100644 --- a/ci/requirements-3.4.pip +++ b/ci/requirements-3.4.pip @@ -1,5 +1,2 @@ python-dateutil==2.2 blosc -httplib2 -google-api-python-client -oauth2client diff --git a/ci/requirements-3.4_SLOW.pip b/ci/requirements-3.4_SLOW.pip deleted file mode 100644 index 05c938abcbab6..0000000000000 --- a/ci/requirements-3.4_SLOW.pip +++ /dev/null @@ -1,3 +0,0 @@ -httplib2 -google-api-python-client -oauth2client diff --git a/ci/requirements-3.5.pip b/ci/requirements-3.5.pip index 0d9e44cf39fa4..6e4f7b65f9728 100644 --- a/ci/requirements-3.5.pip +++ b/ci/requirements-3.5.pip @@ -1 +1,2 @@ xarray==0.9.1 +pandas-gbq diff --git a/doc/source/io.rst b/doc/source/io.rst index 35e8b77782183..b36ae8c2ed450 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4652,293 +4652,18 @@ And then issue the following queries: Google BigQuery --------------- -.. versionadded:: 0.13.0 - -The :mod:`pandas.io.gbq` module provides a wrapper for Google's BigQuery -analytics web service to simplify retrieving results from BigQuery tables -using SQL-like queries. Result sets are parsed into a pandas -DataFrame with a shape and data types derived from the source table. -Additionally, DataFrames can be inserted into new BigQuery tables or appended -to existing tables. - -.. warning:: - - To use this module, you will need a valid BigQuery account. Refer to the - `BigQuery Documentation `__ - for details on the service itself. - -The key functions are: - -.. currentmodule:: pandas.io.gbq - -.. autosummary:: - :toctree: generated/ - - read_gbq - to_gbq - -.. currentmodule:: pandas - - -Supported Data Types -'''''''''''''''''''' - -Pandas supports all these `BigQuery data types `__: -``STRING``, ``INTEGER`` (64bit), ``FLOAT`` (64 bit), ``BOOLEAN`` and -``TIMESTAMP`` (microsecond precision). Data types ``BYTES`` and ``RECORD`` -are not supported. - -Integer and boolean ``NA`` handling -''''''''''''''''''''''''''''''''''' - -.. versionadded:: 0.20 - -Since all columns in BigQuery queries are nullable, and NumPy lacks of ``NA`` -support for integer and boolean types, this module will store ``INTEGER`` or -``BOOLEAN`` columns with at least one ``NULL`` value as ``dtype=object``. -Otherwise those columns will be stored as ``dtype=int64`` or ``dtype=bool`` -respectively. - -This is opposite to default pandas behaviour which will promote integer -type to float in order to store NAs. See the :ref:`gotchas` -for detailed explaination. - -While this trade-off works well for most cases, it breaks down for storing -values greater than 2**53. Such values in BigQuery can represent identifiers -and unnoticed precision lost for identifier is what we want to avoid. - -.. _io.bigquery_deps: - -Dependencies -'''''''''''' - -This module requires following additional dependencies: - -- `httplib2 `__: HTTP client -- `google-api-python-client `__: Google's API client -- `oauth2client `__: authentication and authorization for Google's API - -.. _io.bigquery_authentication: - -Authentication -'''''''''''''' - -.. versionadded:: 0.18.0 - -Authentication to the Google ``BigQuery`` service is via ``OAuth 2.0``. -Is possible to authenticate with either user account credentials or service account credentials. - -Authenticating with user account credentials is as simple as following the prompts in a browser window -which will be automatically opened for you. You will be authenticated to the specified -``BigQuery`` account using the product name ``pandas GBQ``. It is only possible on local host. -The remote authentication using user account credentials is not currently supported in pandas. -Additional information on the authentication mechanism can be found -`here `__. - -Authentication with service account credentials is possible via the `'private_key'` parameter. This method -is particularly useful when working on remote servers (eg. jupyter iPython notebook on remote host). -Additional information on service accounts can be found -`here `__. - -Authentication via ``application default credentials`` is also possible. This is only valid -if the parameter ``private_key`` is not provided. This method also requires that -the credentials can be fetched from the environment the code is running in. -Otherwise, the OAuth2 client-side authentication is used. -Additional information on -`application default credentials `__. - -.. versionadded:: 0.19.0 - -.. note:: - - The `'private_key'` parameter can be set to either the file path of the service account key - in JSON format, or key contents of the service account key in JSON format. - -.. note:: - - A private key can be obtained from the Google developers console by clicking - `here `__. Use JSON key type. - -.. _io.bigquery_reader: - -Querying -'''''''' - -Suppose you want to load all data from an existing BigQuery table : `test_dataset.test_table` -into a DataFrame using the :func:`~pandas.io.gbq.read_gbq` function. - -.. code-block:: python - - # Insert your BigQuery Project ID Here - # Can be found in the Google web console - projectid = "xxxxxxxx" - - data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', projectid) - - -You can define which column from BigQuery to use as an index in the -destination DataFrame as well as a preferred column order as follows: - -.. code-block:: python - - data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', - index_col='index_column_name', - col_order=['col1', 'col2', 'col3'], projectid) - - -Starting with 0.20.0, you can specify the query config as parameter to use additional options of your job. -For more information about query configuration parameters see -`here `__. - -.. code-block:: python - - configuration = { - 'query': { - "useQueryCache": False - } - } - data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', - configuration=configuration, projectid) - - -.. note:: - - You can find your project id in the `Google developers console `__. - - -.. note:: - - You can toggle the verbose output via the ``verbose`` flag which defaults to ``True``. - -.. note:: - - The ``dialect`` argument can be used to indicate whether to use BigQuery's ``'legacy'`` SQL - or BigQuery's ``'standard'`` SQL (beta). The default value is ``'legacy'``. For more information - on BigQuery's standard SQL, see `BigQuery SQL Reference - `__ - -.. _io.bigquery_writer: - -Writing DataFrames -'''''''''''''''''' - -Assume we want to write a DataFrame ``df`` into a BigQuery table using :func:`~pandas.DataFrame.to_gbq`. - -.. ipython:: python - - df = pd.DataFrame({'my_string': list('abc'), - 'my_int64': list(range(1, 4)), - 'my_float64': np.arange(4.0, 7.0), - 'my_bool1': [True, False, True], - 'my_bool2': [False, True, False], - 'my_dates': pd.date_range('now', periods=3)}) - - df - df.dtypes - -.. code-block:: python - - df.to_gbq('my_dataset.my_table', projectid) - -.. note:: - - The destination table and destination dataset will automatically be created if they do not already exist. - -The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'`` -or ``'append'`` if the destination table already exists. The default value is ``'fail'``. - -For example, assume that ``if_exists`` is set to ``'fail'``. The following snippet will raise -a ``TableCreationError`` if the destination table already exists. - -.. code-block:: python - - df.to_gbq('my_dataset.my_table', projectid, if_exists='fail') - -.. note:: - - If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will - be written to the table using the defined table schema and column types. The - dataframe must match the destination table in structure and data types. - If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a - different schema, a delay of 2 minutes will be forced to ensure that the new schema - has propagated in the Google environment. See - `Google BigQuery issue 191 `__. - -Writing large DataFrames can result in errors due to size limitations being exceeded. -This can be avoided by setting the ``chunksize`` argument when calling :func:`~pandas.DataFrame.to_gbq`. -For example, the following writes ``df`` to a BigQuery table in batches of 10000 rows at a time: - -.. code-block:: python - - df.to_gbq('my_dataset.my_table', projectid, chunksize=10000) - -You can also see the progress of your post via the ``verbose`` flag which defaults to ``True``. -For example: - -.. code-block:: python - - In [8]: df.to_gbq('my_dataset.my_table', projectid, chunksize=10000, verbose=True) - - Streaming Insert is 10% Complete - Streaming Insert is 20% Complete - Streaming Insert is 30% Complete - Streaming Insert is 40% Complete - Streaming Insert is 50% Complete - Streaming Insert is 60% Complete - Streaming Insert is 70% Complete - Streaming Insert is 80% Complete - Streaming Insert is 90% Complete - Streaming Insert is 100% Complete - -.. note:: - - If an error occurs while streaming data to BigQuery, see - `Troubleshooting BigQuery Errors `__. - -.. note:: - - The BigQuery SQL query language has some oddities, see the - `BigQuery Query Reference Documentation `__. - -.. note:: - - While BigQuery uses SQL-like syntax, it has some important differences from traditional - databases both in functionality, API limitations (size and quantity of queries or uploads), - and how Google charges for use of the service. You should refer to `Google BigQuery documentation `__ - often as the service seems to be changing and evolving. BiqQuery is best for analyzing large - sets of data quickly, but it is not a direct replacement for a transactional database. - -.. _io.bigquery_create_tables: - -Creating BigQuery Tables -'''''''''''''''''''''''' - .. warning:: - As of 0.17, the function :func:`~pandas.io.gbq.generate_bq_schema` has been deprecated and will be - removed in a future version. - -As of 0.15.2, the gbq module has a function :func:`~pandas.io.gbq.generate_bq_schema` which will -produce the dictionary representation schema of the specified pandas DataFrame. - -.. code-block:: ipython - - In [10]: gbq.generate_bq_schema(df, default_type='STRING') + Starting in 0.20.0, pandas has split off Google BigQuery support into the + separate package ``pandas-gbq``. You can ``pip install pandas-gbq`` to get it. - Out[10]: {'fields': [{'name': 'my_bool1', 'type': 'BOOLEAN'}, - {'name': 'my_bool2', 'type': 'BOOLEAN'}, - {'name': 'my_dates', 'type': 'TIMESTAMP'}, - {'name': 'my_float64', 'type': 'FLOAT'}, - {'name': 'my_int64', 'type': 'INTEGER'}, - {'name': 'my_string', 'type': 'STRING'}]} - -.. note:: +The ``pandas-gbq`` package provides functionality to read/write from Google BigQuery. - If you delete and re-create a BigQuery table with the same name, but different table schema, - you must wait 2 minutes before streaming data into the table. As a workaround, consider creating - the new table with a different name. Refer to - `Google BigQuery issue 191 `__. +pandas integrates with this external package. if ``pandas-gbq`` is installed, you can +use the pandas methods ``pd.read_gbq`` and ``DataFrame.to_gbq``, which will call the +respective functions from ``pandas-gbq``. +Full cocumentation can be found `here `__ .. _io.stata: diff --git a/doc/source/whatsnew/v0.20.0.txt b/doc/source/whatsnew/v0.20.0.txt index f13b584a4ee13..f0e4176472861 100644 --- a/doc/source/whatsnew/v0.20.0.txt +++ b/doc/source/whatsnew/v0.20.0.txt @@ -360,6 +360,15 @@ New Behavior: In [5]: df['a']['2011-12-31 23:59:59'] Out[5]: 1 +.. _whatsnew_0200.api_breaking.gbq: + +Pandas Google BigQuery support has moved +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +pandas has split off Google BigQuery support into a separate package ``pandas-gbq``. You can ``pip install pandas-gbq`` to get it. +The functionality of ``pd.read_gbq()`` and ``.to_gbq()`` remains the same with the currently released version of ``pandas-gbq=0.1.2``. (:issue:`15347`) +Documentation is now hosted `here `__ + .. _whatsnew_0200.api_breaking.memory_usage: Memory Usage for Index is more Accurate diff --git a/pandas/core/frame.py b/pandas/core/frame.py index adf397e63984f..7b02926ea8837 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -77,7 +77,8 @@ OrderedDict, raise_with_traceback) from pandas import compat from pandas.compat.numpy import function as nv -from pandas.util.decorators import deprecate_kwarg, Appender, Substitution +from pandas.util.decorators import (deprecate_kwarg, Appender, + Substitution, docstring_wrapper) from pandas.util.validators import validate_bool_kwarg from pandas.tseries.period import PeriodIndex @@ -941,6 +942,11 @@ def to_gbq(self, destination_table, project_id, chunksize=10000, chunksize=chunksize, verbose=verbose, reauth=reauth, if_exists=if_exists, private_key=private_key) + def _f(): + from pandas.io.gbq import _try_import + return _try_import().to_gbq.__doc__ + to_gbq = docstring_wrapper(to_gbq, _f) + @classmethod def from_records(cls, data, index=None, exclude=None, columns=None, coerce_float=False, nrows=None): diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index a5558866937cf..3407f51af5e83 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -1,1180 +1,52 @@ -import warnings -from datetime import datetime -import json -import logging -from time import sleep -import uuid -import time -import sys +""" Google BigQuery support """ -import numpy as np +from pandas.util.decorators import docstring_wrapper -from distutils.version import StrictVersion -from pandas import compat, DataFrame, concat -from pandas.core.common import PandasError -from pandas.compat import lzip, bytes_to_str - - -def _check_google_client_version(): +def _try_import(): + # since pandas is a dependency of pandas-gbq + # we need to import on first use try: - import pkg_resources - + import pandas_gbq except ImportError: - raise ImportError('Could not import pkg_resources (setuptools).') - - if compat.PY3: - google_api_minimum_version = '1.4.1' - else: - google_api_minimum_version = '1.2.0' - - _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution( - 'google-api-python-client').version - - if (StrictVersion(_GOOGLE_API_CLIENT_VERSION) < - StrictVersion(google_api_minimum_version)): - raise ImportError("pandas requires google-api-python-client >= {0} " - "for Google BigQuery support, " - "current version {1}" - .format(google_api_minimum_version, - _GOOGLE_API_CLIENT_VERSION)) - - -def _test_google_api_imports(): - - try: - import httplib2 # noqa - try: - from googleapiclient.discovery import build # noqa - from googleapiclient.errors import HttpError # noqa - except: - from apiclient.discovery import build # noqa - from apiclient.errors import HttpError # noqa - from oauth2client.client import AccessTokenRefreshError # noqa - from oauth2client.client import OAuth2WebServerFlow # noqa - from oauth2client.file import Storage # noqa - from oauth2client.tools import run_flow, argparser # noqa - except ImportError as e: - raise ImportError("Missing module required for Google BigQuery " - "support: {0}".format(str(e))) - - -logger = logging.getLogger('pandas.io.gbq') -logger.setLevel(logging.ERROR) - - -class InvalidPrivateKeyFormat(PandasError, ValueError): - """ - Raised when provided private key has invalid format. - """ - pass - - -class AccessDenied(PandasError, ValueError): - """ - Raised when invalid credentials are provided, or tokens have expired. - """ - pass - - -class DatasetCreationError(PandasError, ValueError): - """ - Raised when the create dataset method fails - """ - pass - - -class GenericGBQException(PandasError, ValueError): - """ - Raised when an unrecognized Google API Error occurs. - """ - pass - - -class InvalidColumnOrder(PandasError, ValueError): - """ - Raised when the provided column order for output - results DataFrame does not match the schema - returned by BigQuery. - """ - pass - - -class InvalidPageToken(PandasError, ValueError): - """ - Raised when Google BigQuery fails to return, - or returns a duplicate page token. - """ - pass - - -class InvalidSchema(PandasError, ValueError): - """ - Raised when the provided DataFrame does - not match the schema of the destination - table in BigQuery. - """ - pass - - -class NotFoundException(PandasError, ValueError): - """ - Raised when the project_id, table or dataset provided in the query could - not be found. - """ - pass - - -class StreamingInsertError(PandasError, ValueError): - """ - Raised when BigQuery reports a streaming insert error. - For more information see `Streaming Data Into BigQuery - `__ - """ - - -class TableCreationError(PandasError, ValueError): - """ - Raised when the create table method fails - """ - pass - - -class GbqConnector(object): - scope = 'https://www.googleapis.com/auth/bigquery' - - def __init__(self, project_id, reauth=False, verbose=False, - private_key=None, dialect='legacy'): - _check_google_client_version() - _test_google_api_imports() - self.project_id = project_id - self.reauth = reauth - self.verbose = verbose - self.private_key = private_key - self.dialect = dialect - self.credentials = self.get_credentials() - self.service = self.get_service() - - def get_credentials(self): - if self.private_key: - return self.get_service_account_credentials() - else: - # Try to retrieve Application Default Credentials - credentials = self.get_application_default_credentials() - if not credentials: - credentials = self.get_user_account_credentials() - return credentials - - def get_application_default_credentials(self): - """ - This method tries to retrieve the "default application credentials". - This could be useful for running code on Google Cloud Platform. - - .. versionadded:: 0.19.0 - - Parameters - ---------- - None - - Returns - ------- - - GoogleCredentials, - If the default application credentials can be retrieved - from the environment. The retrieved credentials should also - have access to the project (self.project_id) on BigQuery. - - OR None, - If default application credentials can not be retrieved - from the environment. Or, the retrieved credentials do not - have access to the project (self.project_id) on BigQuery. - """ - import httplib2 - try: - from googleapiclient.discovery import build - except ImportError: - from apiclient.discovery import build - try: - from oauth2client.client import GoogleCredentials - except ImportError: - return None - - try: - credentials = GoogleCredentials.get_application_default() - except: - return None - - http = httplib2.Http() - try: - http = credentials.authorize(http) - bigquery_service = build('bigquery', 'v2', http=http) - # Check if the application has rights to the BigQuery project - jobs = bigquery_service.jobs() - job_data = {'configuration': {'query': {'query': 'SELECT 1'}}} - jobs.insert(projectId=self.project_id, body=job_data).execute() - return credentials - except: - return None - - def get_user_account_credentials(self): - from oauth2client.client import OAuth2WebServerFlow - from oauth2client.file import Storage - from oauth2client.tools import run_flow, argparser - - flow = OAuth2WebServerFlow( - client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd' - '.apps.googleusercontent.com'), - client_secret='kOc9wMptUtxkcIFbtZCcrEAc', - scope=self.scope, - redirect_uri='urn:ietf:wg:oauth:2.0:oob') - - storage = Storage('bigquery_credentials.dat') - credentials = storage.get() - - if credentials is None or credentials.invalid or self.reauth: - credentials = run_flow(flow, storage, argparser.parse_args([])) - - return credentials - - def get_service_account_credentials(self): - # Bug fix for https://github.com/pandas-dev/pandas/issues/12572 - # We need to know that a supported version of oauth2client is installed - # Test that either of the following is installed: - # - SignedJwtAssertionCredentials from oauth2client.client - # - ServiceAccountCredentials from oauth2client.service_account - # SignedJwtAssertionCredentials is available in oauthclient < 2.0.0 - # ServiceAccountCredentials is available in oauthclient >= 2.0.0 - oauth2client_v1 = True - oauth2client_v2 = True - - try: - from oauth2client.client import SignedJwtAssertionCredentials - except ImportError: - oauth2client_v1 = False - - try: - from oauth2client.service_account import ServiceAccountCredentials - except ImportError: - oauth2client_v2 = False - - if not oauth2client_v1 and not oauth2client_v2: - raise ImportError("Missing oauth2client required for BigQuery " - "service account support") - - from os.path import isfile - - try: - if isfile(self.private_key): - with open(self.private_key) as f: - json_key = json.loads(f.read()) - else: - # ugly hack: 'private_key' field has new lines inside, - # they break json parser, but we need to preserve them - json_key = json.loads(self.private_key.replace('\n', ' ')) - json_key['private_key'] = json_key['private_key'].replace( - ' ', '\n') - - if compat.PY3: - json_key['private_key'] = bytes( - json_key['private_key'], 'UTF-8') - - if oauth2client_v1: - return SignedJwtAssertionCredentials( - json_key['client_email'], - json_key['private_key'], - self.scope, - ) - else: - return ServiceAccountCredentials.from_json_keyfile_dict( - json_key, - self.scope) - except (KeyError, ValueError, TypeError, AttributeError): - raise InvalidPrivateKeyFormat( - "Private key is missing or invalid. It should be service " - "account private key JSON (file path or string contents) " - "with at least two keys: 'client_email' and 'private_key'. " - "Can be obtained from: https://console.developers.google." - "com/permissions/serviceaccounts") - - def _print(self, msg, end='\n'): - if self.verbose: - sys.stdout.write(msg + end) - sys.stdout.flush() - - def _start_timer(self): - self.start = time.time() - - def get_elapsed_seconds(self): - return round(time.time() - self.start, 2) - - def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.', - overlong=7): - sec = self.get_elapsed_seconds() - if sec > overlong: - self._print('{} {} {}'.format(prefix, sec, postfix)) - - # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size - @staticmethod - def sizeof_fmt(num, suffix='b'): - fmt = "%3.1f %s%s" - for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: - if abs(num) < 1024.0: - return fmt % (num, unit, suffix) - num /= 1024.0 - return fmt % (num, 'Y', suffix) - - def get_service(self): - import httplib2 - try: - from googleapiclient.discovery import build - except: - from apiclient.discovery import build - - http = httplib2.Http() - http = self.credentials.authorize(http) - bigquery_service = build('bigquery', 'v2', http=http) - - return bigquery_service - - @staticmethod - def process_http_error(ex): - # See `BigQuery Troubleshooting Errors - # `__ - - status = json.loads(bytes_to_str(ex.content))['error'] - errors = status.get('errors', None) - - if errors: - for error in errors: - reason = error['reason'] - message = error['message'] - - raise GenericGBQException( - "Reason: {0}, Message: {1}".format(reason, message)) - - raise GenericGBQException(errors) - - def process_insert_errors(self, insert_errors): - for insert_error in insert_errors: - row = insert_error['index'] - errors = insert_error.get('errors', None) - for error in errors: - reason = error['reason'] - message = error['message'] - location = error['location'] - error_message = ('Error at Row: {0}, Reason: {1}, ' - 'Location: {2}, Message: {3}' - .format(row, reason, location, message)) - - # Report all error messages if verbose is set - if self.verbose: - self._print(error_message) - else: - raise StreamingInsertError(error_message + - '\nEnable verbose logging to ' - 'see all errors') - - raise StreamingInsertError - - def run_query(self, query, **kwargs): - try: - from googleapiclient.errors import HttpError - except: - from apiclient.errors import HttpError - from oauth2client.client import AccessTokenRefreshError - - _check_google_client_version() - - job_collection = self.service.jobs() - - job_config = { - 'query': { - 'query': query, - 'useLegacySql': self.dialect == 'legacy' - # 'allowLargeResults', 'createDisposition', - # 'preserveNulls', destinationTable, useQueryCache - } - } - config = kwargs.get('configuration') - if config is not None: - if len(config) != 1: - raise ValueError("Only one job type must be specified, but " - "given {}".format(','.join(config.keys()))) - if 'query' in config: - if 'query' in config['query'] and query is not None: - raise ValueError("Query statement can't be specified " - "inside config while it is specified " - "as parameter") - - job_config['query'].update(config['query']) - else: - raise ValueError("Only 'query' job type is supported") - - job_data = { - 'configuration': job_config - } - - self._start_timer() - try: - self._print('Requesting query... ', end="") - query_reply = job_collection.insert( - projectId=self.project_id, body=job_data).execute() - self._print('ok.\nQuery running...') - except (AccessTokenRefreshError, ValueError): - if self.private_key: - raise AccessDenied( - "The service account credentials are not valid") - else: - raise AccessDenied( - "The credentials have been revoked or expired, " - "please re-run the application to re-authorize") - except HttpError as ex: - self.process_http_error(ex) - - job_reference = query_reply['jobReference'] - - while not query_reply.get('jobComplete', False): - self.print_elapsed_seconds(' Elapsed', 's. Waiting...') - try: - query_reply = job_collection.getQueryResults( - projectId=job_reference['projectId'], - jobId=job_reference['jobId']).execute() - except HttpError as ex: - self.process_http_error(ex) - - if self.verbose: - if query_reply['cacheHit']: - self._print('Query done.\nCache hit.\n') - else: - bytes_processed = int(query_reply.get( - 'totalBytesProcessed', '0')) - self._print('Query done.\nProcessed: {}\n'.format( - self.sizeof_fmt(bytes_processed))) - - self._print('Retrieving results...') - - total_rows = int(query_reply['totalRows']) - result_pages = list() - seen_page_tokens = list() - current_row = 0 - # Only read schema on first page - schema = query_reply['schema'] - - # Loop through each page of data - while 'rows' in query_reply and current_row < total_rows: - page = query_reply['rows'] - result_pages.append(page) - current_row += len(page) - - self.print_elapsed_seconds( - ' Got page: {}; {}% done. Elapsed'.format( - len(result_pages), - round(100.0 * current_row / total_rows))) - - if current_row == total_rows: - break - - page_token = query_reply.get('pageToken', None) - - if not page_token and current_row < total_rows: - raise InvalidPageToken("Required pageToken was missing. " - "Received {0} of {1} rows" - .format(current_row, total_rows)) - - elif page_token in seen_page_tokens: - raise InvalidPageToken("A duplicate pageToken was returned") - - seen_page_tokens.append(page_token) - - try: - query_reply = job_collection.getQueryResults( - projectId=job_reference['projectId'], - jobId=job_reference['jobId'], - pageToken=page_token).execute() - except HttpError as ex: - self.process_http_error(ex) - - if current_row < total_rows: - raise InvalidPageToken() - - # print basic query stats - self._print('Got {} rows.\n'.format(total_rows)) - - return schema, result_pages - - def load_data(self, dataframe, dataset_id, table_id, chunksize): - try: - from googleapiclient.errors import HttpError - except: - from apiclient.errors import HttpError - - job_id = uuid.uuid4().hex - rows = [] - remaining_rows = len(dataframe) - - total_rows = remaining_rows - self._print("\n\n") - - for index, row in dataframe.reset_index(drop=True).iterrows(): - row_dict = dict() - row_dict['json'] = json.loads(row.to_json(force_ascii=False, - date_unit='s', - date_format='iso')) - row_dict['insertId'] = job_id + str(index) - rows.append(row_dict) - remaining_rows -= 1 - - if (len(rows) % chunksize == 0) or (remaining_rows == 0): - self._print("\rStreaming Insert is {0}% Complete".format( - ((total_rows - remaining_rows) * 100) / total_rows)) - - body = {'rows': rows} - - try: - response = self.service.tabledata().insertAll( - projectId=self.project_id, - datasetId=dataset_id, - tableId=table_id, - body=body).execute() - except HttpError as ex: - self.process_http_error(ex) - - # For streaming inserts, even if you receive a success HTTP - # response code, you'll need to check the insertErrors property - # of the response to determine if the row insertions were - # successful, because it's possible that BigQuery was only - # partially successful at inserting the rows. See the `Success - # HTTP Response Codes - # `__ - # section - - insert_errors = response.get('insertErrors', None) - if insert_errors: - self.process_insert_errors(insert_errors) - - sleep(1) # Maintains the inserts "per second" rate per API - rows = [] - - self._print("\n") - - def verify_schema(self, dataset_id, table_id, schema): - try: - from googleapiclient.errors import HttpError - except: - from apiclient.errors import HttpError - - try: - remote_schema = self.service.tables().get( - projectId=self.project_id, - datasetId=dataset_id, - tableId=table_id).execute()['schema'] - - fields_remote = set([json.dumps(field_remote) - for field_remote in remote_schema['fields']]) - fields_local = set(json.dumps(field_local) - for field_local in schema['fields']) - - return fields_remote == fields_local - except HttpError as ex: - self.process_http_error(ex) - - def delete_and_recreate_table(self, dataset_id, table_id, table_schema): - delay = 0 - - # Changes to table schema may take up to 2 minutes as of May 2015 See - # `Issue 191 - # `__ - # Compare previous schema with new schema to determine if there should - # be a 120 second delay - - if not self.verify_schema(dataset_id, table_id, table_schema): - self._print('The existing table has a different schema. Please ' - 'wait 2 minutes. See Google BigQuery issue #191') - delay = 120 - - table = _Table(self.project_id, dataset_id, - private_key=self.private_key) - table.delete(table_id) - table.create(table_id, table_schema) - sleep(delay) - - -def _parse_data(schema, rows): - # see: - # http://pandas.pydata.org/pandas-docs/dev/missing_data.html - # #missing-data-casting-rules-and-indexing - dtype_map = {'FLOAT': np.dtype(float), - 'TIMESTAMP': 'M8[ns]'} - - fields = schema['fields'] - col_types = [field['type'] for field in fields] - col_names = [str(field['name']) for field in fields] - col_dtypes = [dtype_map.get(field['type'], object) for field in fields] - page_array = np.zeros((len(rows),), dtype=lzip(col_names, col_dtypes)) - for row_num, raw_row in enumerate(rows): - entries = raw_row.get('f', []) - for col_num, field_type in enumerate(col_types): - field_value = _parse_entry(entries[col_num].get('v', ''), - field_type) - page_array[row_num][col_num] = field_value - - return DataFrame(page_array, columns=col_names) + # give a nice error message + raise ImportError("Load data from Google BigQuery\n" + "\n" + "the pandas-gbq package is not installed\n" + "see the docs: https://pandas-gbq.readthedocs.io\n" + "\n" + "you can install via:\n" + "pip install pandas-gbq\n") -def _parse_entry(field_value, field_type): - if field_value is None or field_value == 'null': - return None - if field_type == 'INTEGER': - return int(field_value) - elif field_type == 'FLOAT': - return float(field_value) - elif field_type == 'TIMESTAMP': - timestamp = datetime.utcfromtimestamp(float(field_value)) - return np.datetime64(timestamp) - elif field_type == 'BOOLEAN': - return field_value == 'true' - return field_value + return pandas_gbq def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True, private_key=None, dialect='legacy', **kwargs): - r"""Load data from Google BigQuery. + pandas_gbq = _try_import() + return pandas_gbq.read_gbq( + query, project_id=project_id, + index_col=index_col, col_order=col_order, + reauth=reauth, verbose=verbose, + private_key=private_key, + dialect=dialect, + **kwargs) - THIS IS AN EXPERIMENTAL LIBRARY - The main method a user calls to execute a Query in Google BigQuery - and read results into a pandas DataFrame. - - Google BigQuery API Client Library v2 for Python is used. - Documentation is available at - https://developers.google.com/api-client-library/python/apis/bigquery/v2 - - Authentication to the Google BigQuery service is via OAuth 2.0. - - - If "private_key" is not provided: - - By default "application default credentials" are used. - - .. versionadded:: 0.19.0 - - If default application credentials are not found or are restrictive, - user account credentials are used. In this case, you will be asked to - grant permissions for product name 'pandas GBQ'. - - - If "private_key" is provided: - - Service account credentials will be used to authenticate. - - Parameters - ---------- - query : str - SQL-Like Query to return data values - project_id : str - Google BigQuery Account project ID. - index_col : str (optional) - Name of result column to use for index in results DataFrame - col_order : list(str) (optional) - List of BigQuery column names in the desired order for results - DataFrame - reauth : boolean (default False) - Force Google BigQuery to reauthenticate the user. This is useful - if multiple accounts are used. - verbose : boolean (default True) - Verbose output - private_key : str (optional) - Service account private key in JSON format. Can be file path - or string contents. This is useful for remote server - authentication (eg. jupyter iPython notebook on remote host) - - .. versionadded:: 0.18.1 - - dialect : {'legacy', 'standard'}, default 'legacy' - 'legacy' : Use BigQuery's legacy SQL dialect. - 'standard' : Use BigQuery's standard SQL (beta), which is - compliant with the SQL 2011 standard. For more information - see `BigQuery SQL Reference - `__ - - .. versionadded:: 0.19.0 - - **kwargs : Arbitrary keyword arguments - configuration (dict): query config parameters for job processing. - For example: - - configuration = {'query': {'useQueryCache': False}} - - For more information see `BigQuery SQL Reference - ` - - .. versionadded:: 0.20.0 - - Returns - ------- - df: DataFrame - DataFrame representing results of query - - """ - - if not project_id: - raise TypeError("Missing required parameter: project_id") - - if dialect not in ('legacy', 'standard'): - raise ValueError("'{0}' is not valid for dialect".format(dialect)) - - connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, - private_key=private_key, - dialect=dialect) - schema, pages = connector.run_query(query, **kwargs) - dataframe_list = [] - while len(pages) > 0: - page = pages.pop() - dataframe_list.append(_parse_data(schema, page)) - - if len(dataframe_list) > 0: - final_df = concat(dataframe_list, ignore_index=True) - else: - final_df = _parse_data(schema, []) - - # Reindex the DataFrame on the provided column - if index_col is not None: - if index_col in final_df.columns: - final_df.set_index(index_col, inplace=True) - else: - raise InvalidColumnOrder( - 'Index column "{0}" does not exist in DataFrame.' - .format(index_col) - ) - - # Change the order of columns in the DataFrame based on provided list - if col_order is not None: - if sorted(col_order) == sorted(final_df.columns): - final_df = final_df[col_order] - else: - raise InvalidColumnOrder( - 'Column order does not match this DataFrame.' - ) - - # cast BOOLEAN and INTEGER columns from object to bool/int - # if they dont have any nulls - type_map = {'BOOLEAN': bool, 'INTEGER': int} - for field in schema['fields']: - if field['type'] in type_map and \ - final_df[field['name']].notnull().all(): - final_df[field['name']] = \ - final_df[field['name']].astype(type_map[field['type']]) - - connector.print_elapsed_seconds( - 'Total time taken', - datetime.now().strftime('s.\nFinished at %Y-%m-%d %H:%M:%S.'), - 0 - ) - - return final_df +read_gbq = docstring_wrapper(read_gbq, + lambda: _try_import().read_gbq.__doc__) def to_gbq(dataframe, destination_table, project_id, chunksize=10000, verbose=True, reauth=False, if_exists='fail', private_key=None): - """Write a DataFrame to a Google BigQuery table. - - THIS IS AN EXPERIMENTAL LIBRARY - - The main method a user calls to export pandas DataFrame contents to - Google BigQuery table. - - Google BigQuery API Client Library v2 for Python is used. - Documentation is available at - https://developers.google.com/api-client-library/python/apis/bigquery/v2 - - Authentication to the Google BigQuery service is via OAuth 2.0. - - - If "private_key" is not provided: - - By default "application default credentials" are used. - - .. versionadded:: 0.19.0 - - If default application credentials are not found or are restrictive, - user account credentials are used. In this case, you will be asked to - grant permissions for product name 'pandas GBQ'. - - - If "private_key" is provided: - - Service account credentials will be used to authenticate. - - Parameters - ---------- - dataframe : DataFrame - DataFrame to be written - destination_table : string - Name of table to be written, in the form 'dataset.tablename' - project_id : str - Google BigQuery Account project ID. - chunksize : int (default 10000) - Number of rows to be inserted in each chunk from the dataframe. - verbose : boolean (default True) - Show percentage complete - reauth : boolean (default False) - Force Google BigQuery to reauthenticate the user. This is useful - if multiple accounts are used. - if_exists : {'fail', 'replace', 'append'}, default 'fail' - 'fail': If table exists, do nothing. - 'replace': If table exists, drop it, recreate it, and insert data. - 'append': If table exists, insert data. Create if does not exist. - private_key : str (optional) - Service account private key in JSON format. Can be file path - or string contents. This is useful for remote server - authentication (eg. jupyter iPython notebook on remote host) - """ - - if if_exists not in ('fail', 'replace', 'append'): - raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) - - if '.' not in destination_table: - raise NotFoundException( - "Invalid Table Name. Should be of the form 'datasetId.tableId' ") - - connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, - private_key=private_key) - dataset_id, table_id = destination_table.rsplit('.', 1) - - table = _Table(project_id, dataset_id, reauth=reauth, - private_key=private_key) - - table_schema = _generate_bq_schema(dataframe) - - # If table exists, check if_exists parameter - if table.exists(table_id): - if if_exists == 'fail': - raise TableCreationError("Could not create the table because it " - "already exists. " - "Change the if_exists parameter to " - "append or replace data.") - elif if_exists == 'replace': - connector.delete_and_recreate_table( - dataset_id, table_id, table_schema) - elif if_exists == 'append': - if not connector.verify_schema(dataset_id, table_id, table_schema): - raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") - else: - table.create(table_id, table_schema) - - connector.load_data(dataframe, dataset_id, table_id, chunksize) - - -def generate_bq_schema(df, default_type='STRING'): - # deprecation TimeSeries, #11121 - warnings.warn("generate_bq_schema is deprecated and will be removed in " - "a future version", FutureWarning, stacklevel=2) - - return _generate_bq_schema(df, default_type=default_type) - - -def _generate_bq_schema(df, default_type='STRING'): - """ Given a passed df, generate the associated Google BigQuery schema. - - Parameters - ---------- - df : DataFrame - default_type : string - The default big query type in case the type of the column - does not exist in the schema. - """ - - type_mapping = { - 'i': 'INTEGER', - 'b': 'BOOLEAN', - 'f': 'FLOAT', - 'O': 'STRING', - 'S': 'STRING', - 'U': 'STRING', - 'M': 'TIMESTAMP' - } - - fields = [] - for column_name, dtype in df.dtypes.iteritems(): - fields.append({'name': column_name, - 'type': type_mapping.get(dtype.kind, default_type)}) - - return {'fields': fields} - - -class _Table(GbqConnector): - - def __init__(self, project_id, dataset_id, reauth=False, verbose=False, - private_key=None): - try: - from googleapiclient.errors import HttpError - except: - from apiclient.errors import HttpError - self.http_error = HttpError - self.dataset_id = dataset_id - super(_Table, self).__init__(project_id, reauth, verbose, private_key) - - def exists(self, table_id): - """ Check if a table exists in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - table : str - Name of table to be verified - - Returns - ------- - boolean - true if table exists, otherwise false - """ - - try: - self.service.tables().get( - projectId=self.project_id, - datasetId=self.dataset_id, - tableId=table_id).execute() - return True - except self.http_error as ex: - if ex.resp.status == 404: - return False - else: - self.process_http_error(ex) - - def create(self, table_id, schema): - """ Create a table in Google BigQuery given a table and schema - - .. versionadded:: 0.17.0 - - Parameters - ---------- - table : str - Name of table to be written - schema : str - Use the generate_bq_schema to generate your table schema from a - dataframe. - """ - - if self.exists(table_id): - raise TableCreationError( - "The table could not be created because it already exists") - - if not _Dataset(self.project_id, - private_key=self.private_key).exists(self.dataset_id): - _Dataset(self.project_id, - private_key=self.private_key).create(self.dataset_id) - - body = { - 'schema': schema, - 'tableReference': { - 'tableId': table_id, - 'projectId': self.project_id, - 'datasetId': self.dataset_id - } - } - - try: - self.service.tables().insert( - projectId=self.project_id, - datasetId=self.dataset_id, - body=body).execute() - except self.http_error as ex: - self.process_http_error(ex) - - def delete(self, table_id): - """ Delete a table in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - table : str - Name of table to be deleted - """ - - if not self.exists(table_id): - raise NotFoundException("Table does not exist") - - try: - self.service.tables().delete( - datasetId=self.dataset_id, - projectId=self.project_id, - tableId=table_id).execute() - except self.http_error as ex: - self.process_http_error(ex) - - -class _Dataset(GbqConnector): - - def __init__(self, project_id, reauth=False, verbose=False, - private_key=None): - try: - from googleapiclient.errors import HttpError - except: - from apiclient.errors import HttpError - self.http_error = HttpError - super(_Dataset, self).__init__(project_id, reauth, verbose, - private_key) - - def exists(self, dataset_id): - """ Check if a dataset exists in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - dataset_id : str - Name of dataset to be verified - - Returns - ------- - boolean - true if dataset exists, otherwise false - """ - - try: - self.service.datasets().get( - projectId=self.project_id, - datasetId=dataset_id).execute() - return True - except self.http_error as ex: - if ex.resp.status == 404: - return False - else: - self.process_http_error(ex) - - def datasets(self): - """ Return a list of datasets in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - None - - Returns - ------- - list - List of datasets under the specific project - """ - - dataset_list = [] - next_page_token = None - first_query = True - - while first_query or next_page_token: - first_query = False - - try: - list_dataset_response = self.service.datasets().list( - projectId=self.project_id, - pageToken=next_page_token).execute() - - dataset_response = list_dataset_response.get('datasets') - next_page_token = list_dataset_response.get('nextPageToken') - - if not dataset_response: - return dataset_list - - for row_num, raw_row in enumerate(dataset_response): - dataset_list.append( - raw_row['datasetReference']['datasetId']) - - except self.http_error as ex: - self.process_http_error(ex) - - return dataset_list - - def create(self, dataset_id): - """ Create a dataset in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - dataset : str - Name of dataset to be written - """ - - if self.exists(dataset_id): - raise DatasetCreationError( - "The dataset could not be created because it already exists") - - body = { - 'datasetReference': { - 'projectId': self.project_id, - 'datasetId': dataset_id - } - } - - try: - self.service.datasets().insert( - projectId=self.project_id, - body=body).execute() - except self.http_error as ex: - self.process_http_error(ex) - - def delete(self, dataset_id): - """ Delete a dataset in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - dataset : str - Name of dataset to be deleted - """ - - if not self.exists(dataset_id): - raise NotFoundException( - "Dataset {0} does not exist".format(dataset_id)) - - try: - self.service.datasets().delete( - datasetId=dataset_id, - projectId=self.project_id).execute() - - except self.http_error as ex: - self.process_http_error(ex) - - def tables(self, dataset_id): - """ List tables in the specific dataset in Google BigQuery - - .. versionadded:: 0.17.0 - - Parameters - ---------- - dataset : str - Name of dataset to list tables for - - Returns - ------- - list - List of tables under the specific dataset - """ - - table_list = [] - next_page_token = None - first_query = True - - while first_query or next_page_token: - first_query = False - - try: - list_table_response = self.service.tables().list( - projectId=self.project_id, - datasetId=dataset_id, - pageToken=next_page_token).execute() - - table_response = list_table_response.get('tables') - next_page_token = list_table_response.get('nextPageToken') - - if not table_response: - return table_list - - for row_num, raw_row in enumerate(table_response): - table_list.append(raw_row['tableReference']['tableId']) + pandas_gbq = _try_import() + pandas_gbq.to_gbq(dataframe, destination_table, project_id, + chunksize=chunksize, + verbose=verbose, reauth=reauth, + if_exists=if_exists, private_key=private_key) - except self.http_error as ex: - self.process_http_error(ex) - return table_list +to_gbq = docstring_wrapper(to_gbq, + lambda: _try_import().to_gbq.__doc__) diff --git a/pandas/tests/io/test_gbq.py b/pandas/tests/io/test_gbq.py index 0a76267054ee6..13529e7b54714 100644 --- a/pandas/tests/io/test_gbq.py +++ b/pandas/tests/io/test_gbq.py @@ -1,23 +1,18 @@ -import re -from datetime import datetime import pytest +from datetime import datetime import pytz import platform from time import sleep import os -import logging import numpy as np +import pandas as pd +from pandas import compat, DataFrame -from distutils.version import StrictVersion -from pandas import compat - -from pandas import NaT -from pandas.compat import u, range -from pandas.core.frame import DataFrame -import pandas.io.gbq as gbq +from pandas.compat import range import pandas.util.testing as tm -from pandas.compat.numpy import np_datetime64_compat + +pandas_gbq = pytest.importorskip('pandas_gbq') PROJECT_ID = None PRIVATE_KEY_JSON_PATH = None @@ -33,12 +28,6 @@ VERSION = platform.python_version() -_IMPORTS = False -_GOOGLE_API_CLIENT_INSTALLED = False -_GOOGLE_API_CLIENT_VALID_VERSION = False -_HTTPLIB2_INSTALLED = False -_SETUPTOOLS_INSTALLED = False - def _skip_if_no_project_id(): if not _get_project_id(): @@ -46,23 +35,12 @@ def _skip_if_no_project_id(): "Cannot run integration tests without a project id") -def _skip_local_auth_if_in_travis_env(): - if _in_travis_environment(): - pytest.skip("Cannot run local auth in travis environment") - - def _skip_if_no_private_key_path(): if not _get_private_key_path(): pytest.skip("Cannot run integration tests without a " "private key json file path") -def _skip_if_no_private_key_contents(): - if not _get_private_key_contents(): - pytest.skip("Cannot run integration tests without a " - "private key json contents") - - def _in_travis_environment(): return 'TRAVIS_BUILD_DIR' in os.environ and \ 'GBQ_PROJECT_ID' in os.environ @@ -83,146 +61,15 @@ def _get_private_key_path(): return PRIVATE_KEY_JSON_PATH -def _get_private_key_contents(): - if _in_travis_environment(): - with open(os.path.join(*[os.environ.get('TRAVIS_BUILD_DIR'), 'ci', - 'travis_gbq.json'])) as f: - return f.read() - else: - return PRIVATE_KEY_JSON_CONTENTS - - -def _test_imports(): - global _GOOGLE_API_CLIENT_INSTALLED, _GOOGLE_API_CLIENT_VALID_VERSION, \ - _HTTPLIB2_INSTALLED, _SETUPTOOLS_INSTALLED - - try: - import pkg_resources - _SETUPTOOLS_INSTALLED = True - except ImportError: - _SETUPTOOLS_INSTALLED = False - - if compat.PY3: - google_api_minimum_version = '1.4.1' - else: - google_api_minimum_version = '1.2.0' - - if _SETUPTOOLS_INSTALLED: - try: - try: - from googleapiclient.discovery import build # noqa - from googleapiclient.errors import HttpError # noqa - except: - from apiclient.discovery import build # noqa - from apiclient.errors import HttpError # noqa - - from oauth2client.client import OAuth2WebServerFlow # noqa - from oauth2client.client import AccessTokenRefreshError # noqa - - from oauth2client.file import Storage # noqa - from oauth2client.tools import run_flow # noqa - _GOOGLE_API_CLIENT_INSTALLED = True - _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution( - 'google-api-python-client').version - - if (StrictVersion(_GOOGLE_API_CLIENT_VERSION) >= - StrictVersion(google_api_minimum_version)): - _GOOGLE_API_CLIENT_VALID_VERSION = True - - except ImportError: - _GOOGLE_API_CLIENT_INSTALLED = False - - try: - import httplib2 # noqa - _HTTPLIB2_INSTALLED = True - except ImportError: - _HTTPLIB2_INSTALLED = False - - if not _SETUPTOOLS_INSTALLED: - raise ImportError('Could not import pkg_resources (setuptools).') - - if not _GOOGLE_API_CLIENT_INSTALLED: - raise ImportError('Could not import Google API Client.') - - if not _GOOGLE_API_CLIENT_VALID_VERSION: - raise ImportError("pandas requires google-api-python-client >= {0} " - "for Google BigQuery support, " - "current version {1}" - .format(google_api_minimum_version, - _GOOGLE_API_CLIENT_VERSION)) - - if not _HTTPLIB2_INSTALLED: - raise ImportError( - "pandas requires httplib2 for Google BigQuery support") - - # Bug fix for https://github.com/pandas-dev/pandas/issues/12572 - # We need to know that a supported version of oauth2client is installed - # Test that either of the following is installed: - # - SignedJwtAssertionCredentials from oauth2client.client - # - ServiceAccountCredentials from oauth2client.service_account - # SignedJwtAssertionCredentials is available in oauthclient < 2.0.0 - # ServiceAccountCredentials is available in oauthclient >= 2.0.0 - oauth2client_v1 = True - oauth2client_v2 = True - - try: - from oauth2client.client import SignedJwtAssertionCredentials # noqa - except ImportError: - oauth2client_v1 = False - - try: - from oauth2client.service_account import ServiceAccountCredentials # noqa - except ImportError: - oauth2client_v2 = False - - if not oauth2client_v1 and not oauth2client_v2: - raise ImportError("Missing oauth2client required for BigQuery " - "service account support") - - -def _setup_common(): - try: - _test_imports() - except (ImportError, NotImplementedError) as import_exception: - pytest.skip(import_exception) - - if _in_travis_environment(): - logging.getLogger('oauth2client').setLevel(logging.ERROR) - logging.getLogger('apiclient').setLevel(logging.ERROR) - - -def _check_if_can_get_correct_default_credentials(): - # Checks if "Application Default Credentials" can be fetched - # from the environment the tests are running in. - # See Issue #13577 - - import httplib2 - try: - from googleapiclient.discovery import build - except ImportError: - from apiclient.discovery import build - try: - from oauth2client.client import GoogleCredentials - credentials = GoogleCredentials.get_application_default() - http = httplib2.Http() - http = credentials.authorize(http) - bigquery_service = build('bigquery', 'v2', http=http) - jobs = bigquery_service.jobs() - job_data = {'configuration': {'query': {'query': 'SELECT 1'}}} - jobs.insert(projectId=_get_project_id(), body=job_data).execute() - return True - except: - return False - - def clean_gbq_environment(private_key=None): - dataset = gbq._Dataset(_get_project_id(), private_key=private_key) + dataset = pandas_gbq.gbq._Dataset(_get_project_id(), + private_key=private_key) for i in range(1, 10): if DATASET_ID + str(i) in dataset.datasets(): dataset_id = DATASET_ID + str(i) - table = gbq._Table(_get_project_id(), dataset_id, - private_key=private_key) + table = pandas_gbq.gbq._Table(_get_project_id(), dataset_id, + private_key=private_key) for j in range(1, 20): if TABLE_ID + str(j) in dataset.tables(dataset_id): table.delete(TABLE_ID + str(j)) @@ -246,673 +93,8 @@ def make_mixed_dataframe_v2(test_size): index=range(test_size)) -def test_generate_bq_schema_deprecated(): - # 11121 Deprecation of generate_bq_schema - with tm.assert_produces_warning(FutureWarning): - df = make_mixed_dataframe_v2(10) - gbq.generate_bq_schema(df) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestGBQConnectorIntegrationWithLocalUserAccountAuth(tm.TestCase): - - def setUp(self): - _setup_common() - _skip_if_no_project_id() - _skip_local_auth_if_in_travis_env() - - self.sut = gbq.GbqConnector(_get_project_id()) - - def test_should_be_able_to_make_a_connector(self): - self.assertTrue(self.sut is not None, - 'Could not create a GbqConnector') - - def test_should_be_able_to_get_valid_credentials(self): - credentials = self.sut.get_credentials() - self.assertFalse(credentials.invalid, 'Returned credentials invalid') - - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - self.assertTrue(bigquery_service is not None, 'No service returned') - - def test_should_be_able_to_get_schema_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(schema is not None) - - def test_should_be_able_to_get_results_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(pages is not None) - - def test_get_application_default_credentials_does_not_throw_error(self): - if _check_if_can_get_correct_default_credentials(): - pytest.skip("Can get default_credentials " - "from the environment!") - credentials = self.sut.get_application_default_credentials() - self.assertIsNone(credentials) - - def test_get_application_default_credentials_returns_credentials(self): - if not _check_if_can_get_correct_default_credentials(): - pytest.skip("Cannot get default_credentials " - "from the environment!") - from oauth2client.client import GoogleCredentials - credentials = self.sut.get_application_default_credentials() - self.assertTrue(isinstance(credentials, GoogleCredentials)) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestGBQConnectorIntegrationWithServiceAccountKeyPath(tm.TestCase): - def setUp(self): - _setup_common() - - _skip_if_no_project_id() - _skip_if_no_private_key_path() - - self.sut = gbq.GbqConnector(_get_project_id(), - private_key=_get_private_key_path()) - - def test_should_be_able_to_make_a_connector(self): - self.assertTrue(self.sut is not None, - 'Could not create a GbqConnector') - - def test_should_be_able_to_get_valid_credentials(self): - credentials = self.sut.get_credentials() - self.assertFalse(credentials.invalid, 'Returned credentials invalid') - - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - self.assertTrue(bigquery_service is not None, 'No service returned') - - def test_should_be_able_to_get_schema_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(schema is not None) - - def test_should_be_able_to_get_results_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(pages is not None) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestGBQConnectorIntegrationWithServiceAccountKeyContents(tm.TestCase): - def setUp(self): - _setup_common() - - _skip_if_no_project_id() - _skip_if_no_private_key_contents() - - self.sut = gbq.GbqConnector(_get_project_id(), - private_key=_get_private_key_contents()) - - def test_should_be_able_to_make_a_connector(self): - self.assertTrue(self.sut is not None, - 'Could not create a GbqConnector') - - def test_should_be_able_to_get_valid_credentials(self): - credentials = self.sut.get_credentials() - self.assertFalse(credentials.invalid, 'Returned credentials invalid') - - def test_should_be_able_to_get_a_bigquery_service(self): - bigquery_service = self.sut.get_service() - self.assertTrue(bigquery_service is not None, 'No service returned') - - def test_should_be_able_to_get_schema_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(schema is not None) - - def test_should_be_able_to_get_results_from_query(self): - schema, pages = self.sut.run_query('SELECT 1') - self.assertTrue(pages is not None) - - -class GBQUnitTests(tm.TestCase): - - def setUp(self): - _setup_common() - - def test_import_google_api_python_client(self): - if compat.PY2: - with tm.assertRaises(ImportError): - from googleapiclient.discovery import build # noqa - from googleapiclient.errors import HttpError # noqa - from apiclient.discovery import build # noqa - from apiclient.errors import HttpError # noqa - else: - from googleapiclient.discovery import build # noqa - from googleapiclient.errors import HttpError # noqa - - def test_should_return_bigquery_integers_as_python_ints(self): - result = gbq._parse_entry(1, 'INTEGER') - tm.assert_equal(result, int(1)) - - def test_should_return_bigquery_floats_as_python_floats(self): - result = gbq._parse_entry(1, 'FLOAT') - tm.assert_equal(result, float(1)) - - def test_should_return_bigquery_timestamps_as_numpy_datetime(self): - result = gbq._parse_entry('0e9', 'TIMESTAMP') - tm.assert_equal(result, np_datetime64_compat('1970-01-01T00:00:00Z')) - - def test_should_return_bigquery_booleans_as_python_booleans(self): - result = gbq._parse_entry('false', 'BOOLEAN') - tm.assert_equal(result, False) - - def test_should_return_bigquery_strings_as_python_strings(self): - result = gbq._parse_entry('STRING', 'STRING') - tm.assert_equal(result, 'STRING') - - def test_to_gbq_should_fail_if_invalid_table_name_passed(self): - with tm.assertRaises(gbq.NotFoundException): - gbq.to_gbq(DataFrame(), 'invalid_table_name', project_id="1234") - - def test_to_gbq_with_no_project_id_given_should_fail(self): - with tm.assertRaises(TypeError): - gbq.to_gbq(DataFrame(), 'dataset.tablename') - - def test_read_gbq_with_no_project_id_given_should_fail(self): - with tm.assertRaises(TypeError): - gbq.read_gbq('SELECT 1') - - def test_that_parse_data_works_properly(self): - test_schema = {'fields': [ - {'mode': 'NULLABLE', 'name': 'valid_string', 'type': 'STRING'}]} - test_page = [{'f': [{'v': 'PI'}]}] - - test_output = gbq._parse_data(test_schema, test_page) - correct_output = DataFrame({'valid_string': ['PI']}) - tm.assert_frame_equal(test_output, correct_output) - - def test_read_gbq_with_invalid_private_key_json_should_fail(self): - with tm.assertRaises(gbq.InvalidPrivateKeyFormat): - gbq.read_gbq('SELECT 1', project_id='x', private_key='y') - - def test_read_gbq_with_empty_private_key_json_should_fail(self): - with tm.assertRaises(gbq.InvalidPrivateKeyFormat): - gbq.read_gbq('SELECT 1', project_id='x', private_key='{}') - - def test_read_gbq_with_private_key_json_wrong_types_should_fail(self): - with tm.assertRaises(gbq.InvalidPrivateKeyFormat): - gbq.read_gbq( - 'SELECT 1', project_id='x', - private_key='{ "client_email" : 1, "private_key" : True }') - - def test_read_gbq_with_empty_private_key_file_should_fail(self): - with tm.ensure_clean() as empty_file_path: - with tm.assertRaises(gbq.InvalidPrivateKeyFormat): - gbq.read_gbq('SELECT 1', project_id='x', - private_key=empty_file_path) - - def test_read_gbq_with_corrupted_private_key_json_should_fail(self): - _skip_if_no_private_key_contents() - - with tm.assertRaises(gbq.InvalidPrivateKeyFormat): - gbq.read_gbq( - 'SELECT 1', project_id='x', - private_key=re.sub('[a-z]', '9', _get_private_key_contents())) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestReadGBQIntegration(tm.TestCase): - - @classmethod - def setUpClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* - # executing *ALL* tests described below. - - _skip_if_no_project_id() - - _setup_common() - - def setUp(self): - # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test is - # executed. - pass - - @classmethod - def tearDownClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *AFTER* - # executing all tests. - pass - - def tearDown(self): - # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test is - # executed. - pass - - def test_should_read_as_user_account(self): - _skip_local_auth_if_in_travis_env() - - query = 'SELECT "PI" AS valid_string' - df = gbq.read_gbq(query, project_id=_get_project_id()) - tm.assert_frame_equal(df, DataFrame({'valid_string': ['PI']})) - - def test_should_read_as_service_account_with_key_path(self): - _skip_if_no_private_key_path() - query = 'SELECT "PI" AS valid_string' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'valid_string': ['PI']})) - - def test_should_read_as_service_account_with_key_contents(self): - _skip_if_no_private_key_contents() - query = 'SELECT "PI" AS valid_string' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_contents()) - tm.assert_frame_equal(df, DataFrame({'valid_string': ['PI']})) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestReadGBQIntegrationWithServiceAccountKeyPath(tm.TestCase): - - @classmethod - def setUpClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* - # executing *ALL* tests described below. - - _skip_if_no_project_id() - _skip_if_no_private_key_path() - - _setup_common() - - def setUp(self): - # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test is - # executed. - pass - - @classmethod - def tearDownClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *AFTER* - # executing all tests. - pass - - def tearDown(self): - # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test is - # executed. - pass - - def test_should_properly_handle_valid_strings(self): - query = 'SELECT "PI" AS valid_string' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'valid_string': ['PI']})) - - def test_should_properly_handle_empty_strings(self): - query = 'SELECT "" AS empty_string' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'empty_string': [""]})) - - def test_should_properly_handle_null_strings(self): - query = 'SELECT STRING(NULL) AS null_string' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'null_string': [None]})) - - def test_should_properly_handle_valid_integers(self): - query = 'SELECT INTEGER(3) AS valid_integer' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'valid_integer': [3]})) - - def test_should_properly_handle_nullable_integers(self): - query = '''SELECT * FROM - (SELECT 1 AS nullable_integer), - (SELECT NULL AS nullable_integer)''' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'nullable_integer': [1, None]}).astype(object)) - - def test_should_properly_handle_valid_longs(self): - query = 'SELECT 1 << 62 AS valid_long' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'valid_long': [1 << 62]})) - - def test_should_properly_handle_nullable_longs(self): - query = '''SELECT * FROM - (SELECT 1 << 62 AS nullable_long), - (SELECT NULL AS nullable_long)''' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'nullable_long': [1 << 62, None]}).astype(object)) - - def test_should_properly_handle_null_integers(self): - query = 'SELECT INTEGER(NULL) AS null_integer' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'null_integer': [None]})) - - def test_should_properly_handle_valid_floats(self): - from math import pi - query = 'SELECT PI() AS valid_float' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame( - {'valid_float': [pi]})) - - def test_should_properly_handle_nullable_floats(self): - from math import pi - query = '''SELECT * FROM - (SELECT PI() AS nullable_float), - (SELECT NULL AS nullable_float)''' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'nullable_float': [pi, None]})) - - def test_should_properly_handle_valid_doubles(self): - from math import pi - query = 'SELECT PI() * POW(10, 307) AS valid_double' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame( - {'valid_double': [pi * 10 ** 307]})) - - def test_should_properly_handle_nullable_doubles(self): - from math import pi - query = '''SELECT * FROM - (SELECT PI() * POW(10, 307) AS nullable_double), - (SELECT NULL AS nullable_double)''' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'nullable_double': [pi * 10 ** 307, None]})) - - def test_should_properly_handle_null_floats(self): - query = 'SELECT FLOAT(NULL) AS null_float' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'null_float': [np.nan]})) - - def test_should_properly_handle_timestamp_unix_epoch(self): - query = 'SELECT TIMESTAMP("1970-01-01 00:00:00") AS unix_epoch' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame( - {'unix_epoch': [np.datetime64('1970-01-01T00:00:00.000000Z')]})) - - def test_should_properly_handle_arbitrary_timestamp(self): - query = 'SELECT TIMESTAMP("2004-09-15 05:00:00") AS valid_timestamp' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({ - 'valid_timestamp': [np.datetime64('2004-09-15T05:00:00.000000Z')] - })) - - def test_should_properly_handle_null_timestamp(self): - query = 'SELECT TIMESTAMP(NULL) AS null_timestamp' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'null_timestamp': [NaT]})) - - def test_should_properly_handle_true_boolean(self): - query = 'SELECT BOOLEAN(TRUE) AS true_boolean' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'true_boolean': [True]})) - - def test_should_properly_handle_false_boolean(self): - query = 'SELECT BOOLEAN(FALSE) AS false_boolean' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'false_boolean': [False]})) - - def test_should_properly_handle_null_boolean(self): - query = 'SELECT BOOLEAN(NULL) AS null_boolean' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, DataFrame({'null_boolean': [None]})) - - def test_should_properly_handle_nullable_booleans(self): - query = '''SELECT * FROM - (SELECT BOOLEAN(TRUE) AS nullable_boolean), - (SELECT NULL AS nullable_boolean)''' - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal( - df, DataFrame({'nullable_boolean': [True, None]}).astype(object)) - - def test_unicode_string_conversion_and_normalization(self): - correct_test_datatype = DataFrame( - {'unicode_string': [u("\xe9\xfc")]} - ) - - unicode_string = "\xc3\xa9\xc3\xbc" - - if compat.PY3: - unicode_string = unicode_string.encode('latin-1').decode('utf8') - - query = 'SELECT "{0}" AS unicode_string'.format(unicode_string) - - df = gbq.read_gbq(query, project_id=_get_project_id(), - private_key=_get_private_key_path()) - tm.assert_frame_equal(df, correct_test_datatype) - - def test_index_column(self): - query = "SELECT 'a' AS string_1, 'b' AS string_2" - result_frame = gbq.read_gbq(query, project_id=_get_project_id(), - index_col="string_1", - private_key=_get_private_key_path()) - correct_frame = DataFrame( - {'string_1': ['a'], 'string_2': ['b']}).set_index("string_1") - tm.assert_equal(result_frame.index.name, correct_frame.index.name) - - def test_column_order(self): - query = "SELECT 'a' AS string_1, 'b' AS string_2, 'c' AS string_3" - col_order = ['string_3', 'string_1', 'string_2'] - result_frame = gbq.read_gbq(query, project_id=_get_project_id(), - col_order=col_order, - private_key=_get_private_key_path()) - correct_frame = DataFrame({'string_1': ['a'], 'string_2': [ - 'b'], 'string_3': ['c']})[col_order] - tm.assert_frame_equal(result_frame, correct_frame) - - def test_column_order_plus_index(self): - query = "SELECT 'a' AS string_1, 'b' AS string_2, 'c' AS string_3" - col_order = ['string_3', 'string_2'] - result_frame = gbq.read_gbq(query, project_id=_get_project_id(), - index_col='string_1', col_order=col_order, - private_key=_get_private_key_path()) - correct_frame = DataFrame( - {'string_1': ['a'], 'string_2': ['b'], 'string_3': ['c']}) - correct_frame.set_index('string_1', inplace=True) - correct_frame = correct_frame[col_order] - tm.assert_frame_equal(result_frame, correct_frame) - - def test_malformed_query(self): - with tm.assertRaises(gbq.GenericGBQException): - gbq.read_gbq("SELCET * FORM [publicdata:samples.shakespeare]", - project_id=_get_project_id(), - private_key=_get_private_key_path()) - - def test_bad_project_id(self): - with tm.assertRaises(gbq.GenericGBQException): - gbq.read_gbq("SELECT 1", project_id='001', - private_key=_get_private_key_path()) - - def test_bad_table_name(self): - with tm.assertRaises(gbq.GenericGBQException): - gbq.read_gbq("SELECT * FROM [publicdata:samples.nope]", - project_id=_get_project_id(), - private_key=_get_private_key_path()) - - def test_download_dataset_larger_than_200k_rows(self): - test_size = 200005 - # Test for known BigQuery bug in datasets larger than 100k rows - # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results - df = gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] " - "GROUP EACH BY id ORDER BY id ASC LIMIT {0}" - .format(test_size), - project_id=_get_project_id(), - private_key=_get_private_key_path()) - self.assertEqual(len(df.drop_duplicates()), test_size) - - def test_zero_rows(self): - # Bug fix for https://github.com/pandas-dev/pandas/issues/10273 - df = gbq.read_gbq("SELECT title, id, is_bot, " - "SEC_TO_TIMESTAMP(timestamp) ts " - "FROM [publicdata:samples.wikipedia] " - "WHERE timestamp=-9999999", - project_id=_get_project_id(), - private_key=_get_private_key_path()) - page_array = np.zeros( - (0,), dtype=[('title', object), ('id', np.dtype(int)), - ('is_bot', np.dtype(bool)), ('ts', 'M8[ns]')]) - expected_result = DataFrame( - page_array, columns=['title', 'id', 'is_bot', 'ts']) - self.assert_frame_equal(df, expected_result) - - def test_legacy_sql(self): - legacy_sql = "SELECT id FROM [publicdata.samples.wikipedia] LIMIT 10" - - # Test that a legacy sql statement fails when - # setting dialect='standard' - with tm.assertRaises(gbq.GenericGBQException): - gbq.read_gbq(legacy_sql, project_id=_get_project_id(), - dialect='standard', - private_key=_get_private_key_path()) - - # Test that a legacy sql statement succeeds when - # setting dialect='legacy' - df = gbq.read_gbq(legacy_sql, project_id=_get_project_id(), - dialect='legacy', - private_key=_get_private_key_path()) - self.assertEqual(len(df.drop_duplicates()), 10) - - def test_standard_sql(self): - standard_sql = "SELECT DISTINCT id FROM " \ - "`publicdata.samples.wikipedia` LIMIT 10" - - # Test that a standard sql statement fails when using - # the legacy SQL dialect (default value) - with tm.assertRaises(gbq.GenericGBQException): - gbq.read_gbq(standard_sql, project_id=_get_project_id(), - private_key=_get_private_key_path()) - - # Test that a standard sql statement succeeds when - # setting dialect='standard' - df = gbq.read_gbq(standard_sql, project_id=_get_project_id(), - dialect='standard', - private_key=_get_private_key_path()) - self.assertEqual(len(df.drop_duplicates()), 10) - - def test_invalid_option_for_sql_dialect(self): - sql_statement = "SELECT DISTINCT id FROM " \ - "`publicdata.samples.wikipedia` LIMIT 10" - - # Test that an invalid option for `dialect` raises ValueError - with tm.assertRaises(ValueError): - gbq.read_gbq(sql_statement, project_id=_get_project_id(), - dialect='invalid', - private_key=_get_private_key_path()) - - # Test that a correct option for dialect succeeds - # to make sure ValueError was due to invalid dialect - gbq.read_gbq(sql_statement, project_id=_get_project_id(), - dialect='standard', private_key=_get_private_key_path()) - - def test_query_with_parameters(self): - sql_statement = "SELECT @param1 + @param2 AS valid_result" - config = { - 'query': { - "useLegacySql": False, - "parameterMode": "named", - "queryParameters": [ - { - "name": "param1", - "parameterType": { - "type": "INTEGER" - }, - "parameterValue": { - "value": 1 - } - }, - { - "name": "param2", - "parameterType": { - "type": "INTEGER" - }, - "parameterValue": { - "value": 2 - } - } - ] - } - } - # Test that a query that relies on parameters fails - # when parameters are not supplied via configuration - with tm.assertRaises(ValueError): - gbq.read_gbq(sql_statement, project_id=_get_project_id(), - private_key=_get_private_key_path()) - - # Test that the query is successful because we have supplied - # the correct query parameters via the 'config' option - df = gbq.read_gbq(sql_statement, project_id=_get_project_id(), - private_key=_get_private_key_path(), - configuration=config) - tm.assert_frame_equal(df, DataFrame({'valid_result': [3]})) - - def test_query_inside_configuration(self): - query_no_use = 'SELECT "PI_WRONG" AS valid_string' - query = 'SELECT "PI" AS valid_string' - config = { - 'query': { - "query": query, - "useQueryCache": False, - } - } - # Test that it can't pass query both - # inside config and as parameter - with tm.assertRaises(ValueError): - gbq.read_gbq(query_no_use, project_id=_get_project_id(), - private_key=_get_private_key_path(), - configuration=config) - - df = gbq.read_gbq(None, project_id=_get_project_id(), - private_key=_get_private_key_path(), - configuration=config) - tm.assert_frame_equal(df, DataFrame({'valid_string': ['PI']})) - - def test_configuration_without_query(self): - sql_statement = 'SELECT 1' - config = { - 'copy': { - "sourceTable": { - "projectId": _get_project_id(), - "datasetId": "publicdata:samples", - "tableId": "wikipedia" - }, - "destinationTable": { - "projectId": _get_project_id(), - "datasetId": "publicdata:samples", - "tableId": "wikipedia_copied" - }, - } - } - # Test that only 'query' configurations are supported - # nor 'copy','load','extract' - with tm.assertRaises(ValueError): - gbq.read_gbq(sql_statement, project_id=_get_project_id(), - private_key=_get_private_key_path(), - configuration=config) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") +@pytest.mark.single class TestToGBQIntegrationWithServiceAccountKeyPath(tm.TestCase): - # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 - # As a workaround to this issue, each test should use a unique table name. - # Make sure to modify the for loop range in the tearDownClass when a new - # test is added See `Issue 191 - # `__ @classmethod def setUpClass(cls): @@ -923,24 +105,10 @@ def setUpClass(cls): _skip_if_no_project_id() _skip_if_no_private_key_path() - _setup_common() clean_gbq_environment(_get_private_key_path()) - - gbq._Dataset(_get_project_id(), - private_key=_get_private_key_path() - ).create(DATASET_ID + "1") - - def setUp(self): - # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test is - # executed. - - self.dataset = gbq._Dataset(_get_project_id(), - private_key=_get_private_key_path()) - self.table = gbq._Table(_get_project_id(), DATASET_ID + "1", - private_key=_get_private_key_path()) - self.sut = gbq.GbqConnector(_get_project_id(), - private_key=_get_private_key_path()) + pandas_gbq.gbq._Dataset(_get_project_id(), + private_key=_get_private_key_path() + ).create(DATASET_ID + "1") @classmethod def tearDownClass(cls): @@ -950,387 +118,19 @@ def tearDownClass(cls): clean_gbq_environment(_get_private_key_path()) - def tearDown(self): - # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test is - # executed. - pass - - def test_upload_data(self): + def test_roundtrip(self): destination_table = DESTINATION_TABLE + "1" test_size = 20001 df = make_mixed_dataframe_v2(test_size) - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000, - private_key=_get_private_key_path()) - - sleep(30) # <- Curses Google!!! - - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" - .format(destination_table), - project_id=_get_project_id(), - private_key=_get_private_key_path()) - self.assertEqual(result['num_rows'][0], test_size) - - def test_upload_data_if_table_exists_fail(self): - destination_table = DESTINATION_TABLE + "2" - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - self.table.create(TABLE_ID + "2", gbq._generate_bq_schema(df)) - - # Test the default value of if_exists is 'fail' - with tm.assertRaises(gbq.TableCreationError): - gbq.to_gbq(df, destination_table, _get_project_id(), - private_key=_get_private_key_path()) - - # Test the if_exists parameter with value 'fail' - with tm.assertRaises(gbq.TableCreationError): - gbq.to_gbq(df, destination_table, _get_project_id(), - if_exists='fail', private_key=_get_private_key_path()) - - def test_upload_data_if_table_exists_append(self): - destination_table = DESTINATION_TABLE + "3" - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - df_different_schema = tm.makeMixedDataFrame() - - # Initialize table with sample data - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000, - private_key=_get_private_key_path()) - - # Test the if_exists parameter with value 'append' - gbq.to_gbq(df, destination_table, _get_project_id(), - if_exists='append', private_key=_get_private_key_path()) - - sleep(30) # <- Curses Google!!! - - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" - .format(destination_table), - project_id=_get_project_id(), - private_key=_get_private_key_path()) - self.assertEqual(result['num_rows'][0], test_size * 2) - - # Try inserting with a different schema, confirm failure - with tm.assertRaises(gbq.InvalidSchema): - gbq.to_gbq(df_different_schema, destination_table, - _get_project_id(), if_exists='append', - private_key=_get_private_key_path()) - - def test_upload_data_if_table_exists_replace(self): - - destination_table = DESTINATION_TABLE + "4" - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - df_different_schema = tm.makeMixedDataFrame() - - # Initialize table with sample data - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000, - private_key=_get_private_key_path()) - - # Test the if_exists parameter with the value 'replace'. - gbq.to_gbq(df_different_schema, destination_table, - _get_project_id(), if_exists='replace', - private_key=_get_private_key_path()) - - sleep(30) # <- Curses Google!!! - - result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" - .format(destination_table), - project_id=_get_project_id(), - private_key=_get_private_key_path()) - self.assertEqual(result['num_rows'][0], 5) - - @tm.slow - def test_google_upload_errors_should_raise_exception(self): - destination_table = DESTINATION_TABLE + "5" - - test_timestamp = datetime.now(pytz.timezone('US/Arizona')) - bad_df = DataFrame({'bools': [False, False], 'flts': [0.0, 1.0], - 'ints': [0, '1'], 'strs': ['a', 1], - 'times': [test_timestamp, test_timestamp]}, - index=range(2)) - - with tm.assertRaises(gbq.StreamingInsertError): - gbq.to_gbq(bad_df, destination_table, _get_project_id(), - verbose=True, private_key=_get_private_key_path()) - - def test_generate_schema(self): - df = tm.makeMixedDataFrame() - schema = gbq._generate_bq_schema(df) - - test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - - self.assertEqual(schema, test_schema) - - def test_create_table(self): - destination_table = TABLE_ID + "6" - test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - self.table.create(destination_table, test_schema) - self.assertTrue(self.table.exists(destination_table), - 'Expected table to exist') - - def test_table_does_not_exist(self): - self.assertTrue(not self.table.exists(TABLE_ID + "7"), - 'Expected table not to exist') - - def test_delete_table(self): - destination_table = TABLE_ID + "8" - test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - self.table.create(destination_table, test_schema) - self.table.delete(destination_table) - self.assertTrue(not self.table.exists( - destination_table), 'Expected table not to exist') - - def test_list_table(self): - destination_table = TABLE_ID + "9" - test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - self.table.create(destination_table, test_schema) - self.assertTrue( - destination_table in self.dataset.tables(DATASET_ID + "1"), - 'Expected table list to contain table {0}' - .format(destination_table)) - - def test_verify_schema_allows_flexible_column_order(self): - destination_table = TABLE_ID + "10" - test_schema_1 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - test_schema_2 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - - self.table.create(destination_table, test_schema_1) - self.assertTrue(self.sut.verify_schema( - DATASET_ID + "1", destination_table, test_schema_2), - 'Expected schema to match') - - def test_verify_schema_fails_different_data_type(self): - destination_table = TABLE_ID + "11" - test_schema_1 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - test_schema_2 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'STRING'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - - self.table.create(destination_table, test_schema_1) - self.assertFalse(self.sut.verify_schema( - DATASET_ID + "1", destination_table, test_schema_2), - 'Expected different schema') - - def test_verify_schema_fails_different_structure(self): - destination_table = TABLE_ID + "12" - test_schema_1 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - test_schema_2 = {'fields': [{'name': 'A', 'type': 'FLOAT'}, - {'name': 'B2', 'type': 'FLOAT'}, - {'name': 'C', 'type': 'STRING'}, - {'name': 'D', 'type': 'TIMESTAMP'}]} - - self.table.create(destination_table, test_schema_1) - self.assertFalse(self.sut.verify_schema( - DATASET_ID + "1", destination_table, test_schema_2), - 'Expected different schema') - - def test_upload_data_flexible_column_order(self): - destination_table = DESTINATION_TABLE + "13" - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - - # Initialize table with sample data - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000, - private_key=_get_private_key_path()) - - df_columns_reversed = df[df.columns[::-1]] - - gbq.to_gbq(df_columns_reversed, destination_table, _get_project_id(), - if_exists='append', private_key=_get_private_key_path()) - - def test_list_dataset(self): - dataset_id = DATASET_ID + "1" - self.assertTrue(dataset_id in self.dataset.datasets(), - 'Expected dataset list to contain dataset {0}' - .format(dataset_id)) - - def test_list_table_zero_results(self): - dataset_id = DATASET_ID + "2" - self.dataset.create(dataset_id) - table_list = gbq._Dataset(_get_project_id(), - private_key=_get_private_key_path() - ).tables(dataset_id) - self.assertEqual(len(table_list), 0, - 'Expected gbq.list_table() to return 0') - - def test_create_dataset(self): - dataset_id = DATASET_ID + "3" - self.dataset.create(dataset_id) - self.assertTrue(dataset_id in self.dataset.datasets(), - 'Expected dataset to exist') - - def test_delete_dataset(self): - dataset_id = DATASET_ID + "4" - self.dataset.create(dataset_id) - self.dataset.delete(dataset_id) - self.assertTrue(dataset_id not in self.dataset.datasets(), - 'Expected dataset not to exist') - - def test_dataset_exists(self): - dataset_id = DATASET_ID + "5" - self.dataset.create(dataset_id) - self.assertTrue(self.dataset.exists(dataset_id), - 'Expected dataset to exist') - - def create_table_data_dataset_does_not_exist(self): - dataset_id = DATASET_ID + "6" - table_id = TABLE_ID + "1" - table_with_new_dataset = gbq._Table(_get_project_id(), dataset_id) - df = make_mixed_dataframe_v2(10) - table_with_new_dataset.create(table_id, gbq._generate_bq_schema(df)) - self.assertTrue(self.dataset.exists(dataset_id), - 'Expected dataset to exist') - self.assertTrue(table_with_new_dataset.exists( - table_id), 'Expected dataset to exist') - - def test_dataset_does_not_exist(self): - self.assertTrue(not self.dataset.exists( - DATASET_ID + "_not_found"), 'Expected dataset not to exist') - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestToGBQIntegrationWithLocalUserAccountAuth(tm.TestCase): - # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 - # As a workaround to this issue, each test should use a unique table name. - # Make sure to modify the for loop range in the tearDownClass when a new - # test is added - # See `Issue 191 - # `__ - - @classmethod - def setUpClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* - # executing *ALL* tests described below. - - _skip_if_no_project_id() - _skip_local_auth_if_in_travis_env() - - _setup_common() - clean_gbq_environment() - - def setUp(self): - # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test - # is executed. - pass - - @classmethod - def tearDownClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *AFTER* - # executing all tests. - - clean_gbq_environment() - - def tearDown(self): - # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test - # is executed. - pass - - def test_upload_data(self): - destination_table = "{0}.{1}".format(DATASET_ID + "2", TABLE_ID + "1") - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000) - - sleep(30) # <- Curses Google!!! - - result = gbq.read_gbq( - "SELECT COUNT(*) AS num_rows FROM {0}".format(destination_table), - project_id=_get_project_id()) - - self.assertEqual(result['num_rows'][0], test_size) - - -@pytest.mark.xfail(run=False, reason="intermittent failures") -class TestToGBQIntegrationWithServiceAccountKeyContents(tm.TestCase): - # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 - # As a workaround to this issue, each test should use a unique table name. - # Make sure to modify the for loop range in the tearDownClass when a new - # test is added - # See `Issue 191 - # `__ - - @classmethod - def setUpClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* - # executing *ALL* tests described below. - - _setup_common() - _skip_if_no_project_id() - _skip_if_no_private_key_contents() - - clean_gbq_environment(_get_private_key_contents()) - - def setUp(self): - # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test - # is executed. - pass - - @classmethod - def tearDownClass(cls): - # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *AFTER* - # executing all tests. - - clean_gbq_environment(_get_private_key_contents()) - - def tearDown(self): - # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test - # is executed. - pass - - def test_upload_data(self): - destination_table = "{0}.{1}".format(DATASET_ID + "3", TABLE_ID + "1") - - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - - gbq.to_gbq(df, destination_table, _get_project_id(), chunksize=10000, - private_key=_get_private_key_contents()) + df.to_gbq(destination_table, _get_project_id(), chunksize=10000, + private_key=_get_private_key_path()) sleep(30) # <- Curses Google!!! - result = gbq.read_gbq( - "SELECT COUNT(*) AS num_rows FROM {0}".format(destination_table), - project_id=_get_project_id(), - private_key=_get_private_key_contents()) + result = pd.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(destination_table), + project_id=_get_project_id(), + private_key=_get_private_key_path()) self.assertEqual(result['num_rows'][0], test_size) diff --git a/pandas/util/decorators.py b/pandas/util/decorators.py index 1b501eb1d9bda..d966d6b7a1b32 100644 --- a/pandas/util/decorators.py +++ b/pandas/util/decorators.py @@ -3,7 +3,7 @@ import sys import warnings from textwrap import dedent -from functools import wraps +from functools import wraps, update_wrapper def deprecate(name, alternative, alt_name=None): @@ -233,3 +233,39 @@ def make_signature(func): if spec.keywords: args.append('**' + spec.keywords) return args, spec.args + + +class docstring_wrapper(object): + """ + decorator to wrap a function, + provide a dynamically evaluated doc-string + + Parameters + ---------- + func : callable + creator : callable + return the doc-string + default : str, optional + return this doc-string on error + """ + _attrs = ['__module__', '__name__', + '__qualname__', '__annotations__'] + + def __init__(self, func, creator, default=None): + self.func = func + self.creator = creator + self.default = default + update_wrapper( + self, func, [attr for attr in self._attrs + if hasattr(func, attr)]) + + def __call__(self, func, *args, **kwargs): + return self.func(*args, **kwargs) + + @property + def __doc__(self): + try: + return self.creator() + except Exception as exc: + msg = self.default or str(exc) + return msg diff --git a/pandas/util/print_versions.py b/pandas/util/print_versions.py index b0f5d3994ed64..ca75d4d02e927 100644 --- a/pandas/util/print_versions.py +++ b/pandas/util/print_versions.py @@ -88,13 +88,12 @@ def show_versions(as_json=False): ("lxml", lambda mod: mod.etree.__version__), ("bs4", lambda mod: mod.__version__), ("html5lib", lambda mod: mod.__version__), - ("httplib2", lambda mod: mod.__version__), - ("apiclient", lambda mod: mod.__version__), ("sqlalchemy", lambda mod: mod.__version__), ("pymysql", lambda mod: mod.__version__), ("psycopg2", lambda mod: mod.__version__), ("jinja2", lambda mod: mod.__version__), ("s3fs", lambda mod: mod.__version__), + ("pandas_gbq", lambda mod: mod.__version__), ("pandas_datareader", lambda mod: mod.__version__) ]