diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index aaf34d924aed..d20144e3a38f 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -279,17 +279,17 @@ Run a query which can be expected to complete within bounded time: >>> query = """\ SELECT count(*) AS age_count FROM dataset_name.person_ages """ - >>> job = client.run_sync_query(query) - >>> job.timeout_ms = 1000 - >>> job.run() # API request + >>> query = client.run_sync_query(query) + >>> query.timeout_ms = 1000 + >>> query.run() # API request >>> retry_count = 100 >>> while retry_count > 0 and not job.complete: ... retry_count -= 1 ... time.sleep(10) - ... job.reload() # API request - >>> job.schema + ... query.reload() # API request + >>> query.schema [{'name': 'age_count', 'type': 'integer', 'mode': 'nullable'}] - >>> job.rows + >>> query.rows [(15,)] .. note:: diff --git a/gcloud/bigquery/_helpers.py b/gcloud/bigquery/_helpers.py index 0150fb8b697a..701aedb4c055 100644 --- a/gcloud/bigquery/_helpers.py +++ b/gcloud/bigquery/_helpers.py @@ -58,6 +58,7 @@ def _record_from_json(value, field): def _string_from_json(value, _): return value + _CELLDATA_FROM_JSON = { 'INTEGER': _int_from_json, 'FLOAT': _float_from_json, @@ -81,3 +82,73 @@ def _rows_from_json(rows, schema): row_data.append(converter(cell['v'], field)) rows_data.append(tuple(row_data)) return rows_data + + +class _ConfigurationProperty(object): + """Base property implementation. + + Values will be stored on a `_configuration` helper attribute of the + property's job instance. + + :type name: string + :param name: name of the property + """ + + def __init__(self, name): + self.name = name + self._backing_name = '_%s' % (self.name,) + + def __get__(self, instance, owner): + """Descriptor protocal: accesstor""" + if instance is None: + return self + return getattr(instance._configuration, self._backing_name) + + def _validate(self, value): + """Subclasses override to impose validation policy.""" + pass + + def __set__(self, instance, value): + """Descriptor protocal: mutator""" + self._validate(value) + setattr(instance._configuration, self._backing_name, value) + + def __delete__(self, instance): + """Descriptor protocal: deleter""" + delattr(instance._configuration, self._backing_name) + + +class _TypedProperty(_ConfigurationProperty): + """Property implementation: validates based on value type. + + :type name: string + :param name: name of the property + + :type property_type: type or sequence of types + :param property_type: type to be validated + """ + def __init__(self, name, property_type): + super(_TypedProperty, self).__init__(name) + self.property_type = property_type + + def _validate(self, value): + if not isinstance(value, self.property_type): + raise ValueError('Required type: %s' % (self.property_type,)) + + +class _EnumProperty(_ConfigurationProperty): + """Psedo-enumeration class. + + Subclasses must define ``ALLOWED`` as a class-level constant: it must + be a sequence of strings. + + :type name: string + :param name: name of the property + """ + def _validate(self, value): + """Check that ``value`` is one of the allowed values. + + :raises: ValueError if value is not allowed. + """ + if value not in self.ALLOWED: + raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED)) diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 1c3421fa065f..5987796a676e 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -21,8 +21,8 @@ from gcloud.bigquery.job import CopyJob from gcloud.bigquery.job import ExtractTableToStorageJob from gcloud.bigquery.job import LoadTableFromStorageJob -from gcloud.bigquery.job import RunAsyncQueryJob -from gcloud.bigquery.job import RunSyncQueryJob +from gcloud.bigquery.job import QueryJob +from gcloud.bigquery.query import QueryResults class Client(JSONClient): @@ -179,18 +179,18 @@ def run_async_query(self, job_name, query): :type query: string :param query: SQL query to be executed - :rtype: :class:`gcloud.bigquery.job.RunAsyncQueryJob` - :returns: a new ``RunAsyncQueryJob`` instance + :rtype: :class:`gcloud.bigquery.job.QueryJob` + :returns: a new ``QueryJob`` instance """ - return RunAsyncQueryJob(job_name, query, client=self) + return QueryJob(job_name, query, client=self) def run_sync_query(self, query): - """Construct a job for running a SQL query synchronously. + """Run a SQL query synchronously. :type query: string :param query: SQL query to be executed - :rtype: :class:`gcloud.bigquery.job.RunSyncQueryJob` - :returns: a new ``RunSyncQueryJob`` instance + :rtype: :class:`gcloud.bigquery.query.QueryResults` + :returns: a new ``QueryResults`` instance """ - return RunSyncQueryJob(query, client=self) + return QueryResults(query, client=self) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index eb1712025c4f..aff85f9e7f4b 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -23,77 +23,8 @@ from gcloud.bigquery.table import Table from gcloud.bigquery.table import _build_schema_resource from gcloud.bigquery.table import _parse_schema_resource -from gcloud.bigquery._helpers import _rows_from_json - - -class _ConfigurationProperty(object): - """Base property implementation. - - Values will be stored on a `_configuration` helper attribute of the - property's job instance. - - :type name: string - :param name: name of the property - """ - - def __init__(self, name): - self.name = name - self._backing_name = '_%s' % (self.name,) - - def __get__(self, instance, owner): - """Descriptor protocal: accesstor""" - if instance is None: - return self - return getattr(instance._configuration, self._backing_name) - - def _validate(self, value): - """Subclasses override to impose validation policy.""" - pass - - def __set__(self, instance, value): - """Descriptor protocal: mutator""" - self._validate(value) - setattr(instance._configuration, self._backing_name, value) - - def __delete__(self, instance): - """Descriptor protocal: deleter""" - delattr(instance._configuration, self._backing_name) - - -class _TypedProperty(_ConfigurationProperty): - """Property implementation: validates based on value type. - - :type name: string - :param name: name of the property - - :type property_type: type or sequence of types - :param property_type: type to be validated - """ - def __init__(self, name, property_type): - super(_TypedProperty, self).__init__(name) - self.property_type = property_type - - def _validate(self, value): - if not isinstance(value, self.property_type): - raise ValueError('Required type: %s' % (self.property_type,)) - - -class _EnumProperty(_ConfigurationProperty): - """Psedo-enumeration class. - - Subclasses must define ``ALLOWED`` as a class-level constant: it must - be a sequence of strings. - - :type name: string - :param name: name of the property - """ - def _validate(self, value): - """Check that ``value`` is one of the allowed values. - - :raises: ValueError if value is not allowed. - """ - if value not in self.ALLOWED: - raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED)) +from gcloud.bigquery._helpers import _EnumProperty +from gcloud.bigquery._helpers import _TypedProperty class Compression(_EnumProperty): @@ -126,7 +57,7 @@ class Encoding(_EnumProperty): class QueryPriority(_EnumProperty): - """Pseudo-enum for ``RunAsyncQueryJob.priority`` property.""" + """Pseudo-enum for ``QueryJob.priority`` property.""" INTERACTIVE = 'INTERACTIVE' BATCH = 'BATCH' ALLOWED = (INTERACTIVE, BATCH) @@ -823,7 +754,7 @@ class _AsyncQueryConfiguration(object): _write_disposition = None -class RunAsyncQueryJob(_AsyncJob): +class QueryJob(_AsyncJob): """Asynchronous job: query tables. :type name: string @@ -837,7 +768,7 @@ class RunAsyncQueryJob(_AsyncJob): for the dataset (which requires a project). """ def __init__(self, name, query, client): - super(RunAsyncQueryJob, self).__init__(name, client) + super(QueryJob, self).__init__(name, client) self.query = query self._configuration = _AsyncQueryConfiguration() @@ -945,296 +876,3 @@ def _scrub_local_properties(self, cleaned): assert dest_remote['projectId'] == self.project dataset = self._client.dataset(dest_remote['datasetId']) self.destination_table = dataset.table(dest_remote['tableId']) - - -class _SyncQueryConfiguration(object): - """User-settable configuration options for synchronous query jobs. - - Values which are ``None`` -> server defaults. - """ - _default_dataset = None - _max_results = None - _timeout_ms = None - _preserve_nulls = None - _use_query_cache = None - - -class RunSyncQueryJob(_BaseJob): - """Synchronous job: query tables. - - :type query: string - :param query: SQL query string - - :type client: :class:`gcloud.bigquery.client.Client` - :param client: A client which holds credentials and project configuration - for the dataset (which requires a project). - """ - def __init__(self, query, client): - super(RunSyncQueryJob, self).__init__(client) - self.query = query - self._configuration = _SyncQueryConfiguration() - - @property - def cache_hit(self): - """Query results served from cache. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#cacheHit - - :rtype: boolean or ``NoneType`` - :returns: True if the query results were served from cache (None - until set by the server). - """ - return self._properties.get('cacheHit') - - @property - def complete(self): - """Server completed query. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#jobComplete - - :rtype: boolean or ``NoneType`` - :returns: True if the query completed on the server (None - until set by the server). - """ - return self._properties.get('jobComplete') - - @property - def errors(self): - """Errors generated by the query. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#errors - - :rtype: list of mapping, or ``NoneType`` - :returns: Mappings describing errors generated on the server (None - until set by the server). - """ - return self._properties.get('errors') - - @property - def name(self): - """Job name, generated by the back-end. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#jobReference - - :rtype: list of mapping, or ``NoneType`` - :returns: Mappings describing errors generated on the server (None - until set by the server). - """ - return self._properties.get('jobReference', {}).get('jobId') - - @name.setter - def name(self, value): - """Update name of the query. - - :type value: string, or ``NoneType`` - :param value: new name - - :raises: ValueError for invalid value types. - """ - if not isinstance(value, six.string_types) and value is not None: - raise ValueError("Pass a string, or None") - self._properties['jobReference'] = { - 'projectId': self.project, - 'jobId': value, - } - - @property - def page_token(self): - """Token for fetching next bach of results. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#pageToken - - :rtype: string, or ``NoneType`` - :returns: Token generated on the server (None until set by the server). - """ - return self._properties.get('pageToken') - - @property - def total_rows(self): - """Total number of rows returned by the query - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#totalRows - - :rtype: integer, or ``NoneType`` - :returns: Count generated on the server (None until set by the server). - """ - return self._properties.get('totalRows') - - @property - def total_bytes_processed(self): - """Total number of bytes processed by the query - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#totalBytesProcessed - - :rtype: integer, or ``NoneType`` - :returns: Count generated on the server (None until set by the server). - """ - return self._properties.get('totalBytesProcessed') - - @property - def rows(self): - """Query results. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#rows - - :rtype: list of tuples of row values, or ``NoneType`` - :returns: fields describing the schema (None until set by the server). - """ - return _rows_from_json(self._properties.get('rows', ()), self.schema) - - @property - def schema(self): - """Schema for query results. - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#schema - - :rtype: list of :class:`SchemaField`, or ``NoneType`` - :returns: fields describing the schema (None until set by the server). - """ - return _parse_schema_resource(self._properties.get('schema', {})) - - default_dataset = _TypedProperty('default_dataset', Dataset) - """See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#defaultDataset - """ - - max_results = _TypedProperty('max_results', six.integer_types) - """See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#maxResults - """ - - preserve_nulls = _TypedProperty('preserve_nulls', bool) - """See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#preserveNulls - """ - - timeout_ms = _TypedProperty('timeout_ms', six.integer_types) - """See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#timeoutMs - """ - - use_query_cache = _TypedProperty('use_query_cache', bool) - """See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#useQueryCache - """ - - def _set_properties(self, api_response): - """Update properties from resource in body of ``api_response`` - - :type api_response: httplib2.Response - :param api_response: response returned from an API call - """ - self._properties.clear() - self._properties.update(api_response) - - def _build_resource(self): - """Generate a resource for :meth:`begin`.""" - resource = {'query': self.query} - - if self.default_dataset is not None: - resource['defaultDataset'] = { - 'projectId': self.project, - 'datasetId': self.default_dataset.name, - } - - if self.max_results is not None: - resource['maxResults'] = self.max_results - - if self.preserve_nulls is not None: - resource['preserveNulls'] = self.preserve_nulls - - if self.timeout_ms is not None: - resource['timeoutMs'] = self.timeout_ms - - if self.use_query_cache is not None: - resource['useQueryCache'] = self.use_query_cache - - return resource - - def run(self, client=None): - """API call: run the query via a POST request - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/query - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) - path = '/projects/%s/queries' % (self.project,) - api_response = client.connection.api_request( - method='POST', path=path, data=self._build_resource()) - self._set_properties(api_response) - - def fetch_data(self, max_results=None, page_token=None, start_index=None, - timeout_ms=None, client=None): - """API call: fetch a page of query result data via a GET request - - See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults - - :type max_results: integer or ``NoneType`` - :param max_results: maximum number of rows to return. - - :type page_token: string or ``NoneType`` - :param page_token: token representing a cursor into the table's rows. - - :type start_index: integer or ``NoneType`` - :param start_index: zero-based index of starting row - - :type timeout_ms: integer or ``NoneType`` - :param timeout_ms: timeout, in milliseconds, to wait for query to - complete - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - - :rtype: tuple - :returns: ``(row_data, total_rows, page_token)``, where ``row_data`` - is a list of tuples, one per result row, containing only - the values; ``total_rows`` is a count of the total number - of rows in the table; and ``page_token`` is an opaque - string which can be used to fetch the next batch of rows - (``None`` if no further batches can be fetched). - :raises: ValueError if the query has not yet been executed. - """ - if self.name is None: - raise ValueError("Query not yet executed: call 'run()'") - - client = self._require_client(client) - params = {} - - if max_results is not None: - params['maxResults'] = max_results - - if page_token is not None: - params['pageToken'] = page_token - - if start_index is not None: - params['startIndex'] = start_index - - if timeout_ms is not None: - params['timeoutMs'] = timeout_ms - - path = '/projects/%s/queries/%s' % (self.project, self.name) - response = client.connection.api_request(method='GET', - path=path, - query_params=params) - self._set_properties(response) - - total_rows = response.get('totalRows') - page_token = response.get('pageToken') - rows_data = _rows_from_json(response.get('rows', ()), self.schema) - - return rows_data, total_rows, page_token diff --git a/gcloud/bigquery/query.py b/gcloud/bigquery/query.py new file mode 100644 index 000000000000..0e580cb4826f --- /dev/null +++ b/gcloud/bigquery/query.py @@ -0,0 +1,336 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Queries.""" + +import six + +from gcloud.bigquery._helpers import _TypedProperty +from gcloud.bigquery._helpers import _rows_from_json +from gcloud.bigquery.dataset import Dataset +from gcloud.bigquery.job import QueryJob +from gcloud.bigquery.table import _parse_schema_resource + + +class _SyncQueryConfiguration(object): + """User-settable configuration options for synchronous query jobs. + + Values which are ``None`` -> server defaults. + """ + _default_dataset = None + _max_results = None + _timeout_ms = None + _preserve_nulls = None + _use_query_cache = None + + +class QueryResults(object): + """Synchronous job: query tables. + + :type query: string + :param query: SQL query string + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + """ + def __init__(self, query, client): + self._client = client + self._properties = {} + self.query = query + self._configuration = _SyncQueryConfiguration() + + @property + def project(self): + """Project bound to the job. + + :rtype: string + :returns: the project (derived from the client). + """ + return self._client.project + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: :class:`gcloud.bigquery.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + @property + def cache_hit(self): + """Query results served from cache. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#cacheHit + + :rtype: boolean or ``NoneType`` + :returns: True if the query results were served from cache (None + until set by the server). + """ + return self._properties.get('cacheHit') + + @property + def complete(self): + """Server completed query. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#jobComplete + + :rtype: boolean or ``NoneType`` + :returns: True if the query completed on the server (None + until set by the server). + """ + return self._properties.get('jobComplete') + + @property + def errors(self): + """Errors generated by the query. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#errors + + :rtype: list of mapping, or ``NoneType`` + :returns: Mappings describing errors generated on the server (None + until set by the server). + """ + return self._properties.get('errors') + + @property + def name(self): + """Job name, generated by the back-end. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#jobReference + + :rtype: list of mapping, or ``NoneType`` + :returns: Mappings describing errors generated on the server (None + until set by the server). + """ + return self._properties.get('jobReference', {}).get('jobId') + + @property + def job(self): + """Job instance used to run the query. + + :rtype: :class:`gcloud.bigquery.job.QueryJob`, or ``NoneType`` + :returns: Job instance used to run the query (None until + ``jobReference`` property is set by the server). + """ + job_ref = self._properties.get('jobReference') + if job_ref is not None: + return QueryJob(job_ref['jobId'], self.query, self._client) + + @property + def page_token(self): + """Token for fetching next bach of results. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#pageToken + + :rtype: string, or ``NoneType`` + :returns: Token generated on the server (None until set by the server). + """ + return self._properties.get('pageToken') + + @property + def total_rows(self): + """Total number of rows returned by the query + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#totalRows + + :rtype: integer, or ``NoneType`` + :returns: Count generated on the server (None until set by the server). + """ + return self._properties.get('totalRows') + + @property + def total_bytes_processed(self): + """Total number of bytes processed by the query + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#totalBytesProcessed + + :rtype: integer, or ``NoneType`` + :returns: Count generated on the server (None until set by the server). + """ + return self._properties.get('totalBytesProcessed') + + @property + def rows(self): + """Query results. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#rows + + :rtype: list of tuples of row values, or ``NoneType`` + :returns: fields describing the schema (None until set by the server). + """ + return _rows_from_json(self._properties.get('rows', ()), self.schema) + + @property + def schema(self): + """Schema for query results. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#schema + + :rtype: list of :class:`SchemaField`, or ``NoneType`` + :returns: fields describing the schema (None until set by the server). + """ + return _parse_schema_resource(self._properties.get('schema', {})) + + default_dataset = _TypedProperty('default_dataset', Dataset) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#defaultDataset + """ + + max_results = _TypedProperty('max_results', six.integer_types) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#maxResults + """ + + preserve_nulls = _TypedProperty('preserve_nulls', bool) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#preserveNulls + """ + + timeout_ms = _TypedProperty('timeout_ms', six.integer_types) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#timeoutMs + """ + + use_query_cache = _TypedProperty('use_query_cache', bool) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#useQueryCache + """ + + def _set_properties(self, api_response): + """Update properties from resource in body of ``api_response`` + + :type api_response: httplib2.Response + :param api_response: response returned from an API call + """ + self._properties.clear() + self._properties.update(api_response) + + def _build_resource(self): + """Generate a resource for :meth:`begin`.""" + resource = {'query': self.query} + + if self.default_dataset is not None: + resource['defaultDataset'] = { + 'projectId': self.project, + 'datasetId': self.default_dataset.name, + } + + if self.max_results is not None: + resource['maxResults'] = self.max_results + + if self.preserve_nulls is not None: + resource['preserveNulls'] = self.preserve_nulls + + if self.timeout_ms is not None: + resource['timeoutMs'] = self.timeout_ms + + if self.use_query_cache is not None: + resource['useQueryCache'] = self.use_query_cache + + return resource + + def run(self, client=None): + """API call: run the query via a POST request + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/query + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + path = '/projects/%s/queries' % (self.project,) + api_response = client.connection.api_request( + method='POST', path=path, data=self._build_resource()) + self._set_properties(api_response) + + def fetch_data(self, max_results=None, page_token=None, start_index=None, + timeout_ms=None, client=None): + """API call: fetch a page of query result data via a GET request + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults + + :type max_results: integer or ``NoneType`` + :param max_results: maximum number of rows to return. + + :type page_token: string or ``NoneType`` + :param page_token: token representing a cursor into the table's rows. + + :type start_index: integer or ``NoneType`` + :param start_index: zero-based index of starting row + + :type timeout_ms: integer or ``NoneType`` + :param timeout_ms: timeout, in milliseconds, to wait for query to + complete + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: tuple + :returns: ``(row_data, total_rows, page_token)``, where ``row_data`` + is a list of tuples, one per result row, containing only + the values; ``total_rows`` is a count of the total number + of rows in the table; and ``page_token`` is an opaque + string which can be used to fetch the next batch of rows + (``None`` if no further batches can be fetched). + :raises: ValueError if the query has not yet been executed. + """ + if self.name is None: + raise ValueError("Query not yet executed: call 'run()'") + + client = self._require_client(client) + params = {} + + if max_results is not None: + params['maxResults'] = max_results + + if page_token is not None: + params['pageToken'] = page_token + + if start_index is not None: + params['startIndex'] = start_index + + if timeout_ms is not None: + params['timeoutMs'] = timeout_ms + + path = '/projects/%s/queries/%s' % (self.project, self.name) + response = client.connection.api_request(method='GET', + path=path, + query_params=params) + self._set_properties(response) + + total_rows = response.get('totalRows') + page_token = response.get('pageToken') + rows_data = _rows_from_json(response.get('rows', ()), self.schema) + + return rows_data, total_rows, page_token diff --git a/gcloud/bigquery/test__helpers.py b/gcloud/bigquery/test__helpers.py new file mode 100644 index 000000000000..45f036e04263 --- /dev/null +++ b/gcloud/bigquery/test__helpers.py @@ -0,0 +1,116 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class Test_ConfigurationProperty(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.bigquery._helpers import _ConfigurationProperty + return _ConfigurationProperty + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_it(self): + + class Configuration(object): + _attr = None + + class Wrapper(object): + attr = self._makeOne('attr') + + def __init__(self): + self._configuration = Configuration() + + self.assertEqual(Wrapper.attr.name, 'attr') + + wrapper = Wrapper() + self.assertEqual(wrapper.attr, None) + + value = object() + wrapper.attr = value + self.assertTrue(wrapper.attr is value) + self.assertTrue(wrapper._configuration._attr is value) + + del wrapper.attr + self.assertEqual(wrapper.attr, None) + self.assertEqual(wrapper._configuration._attr, None) + + +class Test_TypedProperty(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.bigquery._helpers import _TypedProperty + return _TypedProperty + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_it(self): + + class Configuration(object): + _attr = None + + class Wrapper(object): + attr = self._makeOne('attr', int) + + def __init__(self): + self._configuration = Configuration() + + wrapper = Wrapper() + with self.assertRaises(ValueError): + wrapper.attr = 'BOGUS' + + wrapper.attr = 42 + self.assertEqual(wrapper.attr, 42) + self.assertEqual(wrapper._configuration._attr, 42) + + del wrapper.attr + self.assertEqual(wrapper.attr, None) + self.assertEqual(wrapper._configuration._attr, None) + + +class Test_EnumProperty(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.bigquery._helpers import _EnumProperty + return _EnumProperty + + def test_it(self): + + class Sub(self._getTargetClass()): + ALLOWED = ('FOO', 'BAR', 'BAZ') + + class Configuration(object): + _attr = None + + class Wrapper(object): + attr = Sub('attr') + + def __init__(self): + self._configuration = Configuration() + + wrapper = Wrapper() + with self.assertRaises(ValueError): + wrapper.attr = 'BOGUS' + + wrapper.attr = 'FOO' + self.assertEqual(wrapper.attr, 'FOO') + self.assertEqual(wrapper._configuration._attr, 'FOO') + + del wrapper.attr + self.assertEqual(wrapper.attr, None) + self.assertEqual(wrapper._configuration._attr, None) diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 0e9fa7482ca6..d8d069c0f543 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -187,7 +187,7 @@ def test_extract_table_to_storage(self): self.assertEqual(list(job.destination_uris), [DESTINATION]) def test_run_async_query(self): - from gcloud.bigquery.job import RunAsyncQueryJob + from gcloud.bigquery.job import QueryJob PROJECT = 'PROJECT' JOB = 'job_name' QUERY = 'select count(*) from persons' @@ -195,20 +195,20 @@ def test_run_async_query(self): http = object() client = self._makeOne(project=PROJECT, credentials=creds, http=http) job = client.run_async_query(JOB, QUERY) - self.assertTrue(isinstance(job, RunAsyncQueryJob)) + self.assertTrue(isinstance(job, QueryJob)) self.assertTrue(job._client is client) self.assertEqual(job.name, JOB) self.assertEqual(job.query, QUERY) def test_run_sync_query(self): - from gcloud.bigquery.job import RunSyncQueryJob + from gcloud.bigquery.query import QueryResults PROJECT = 'PROJECT' QUERY = 'select count(*) from persons' creds = _Credentials() http = object() client = self._makeOne(project=PROJECT, credentials=creds, http=http) job = client.run_sync_query(QUERY) - self.assertTrue(isinstance(job, RunSyncQueryJob)) + self.assertTrue(isinstance(job, QueryResults)) self.assertTrue(job._client is client) self.assertEqual(job.name, None) self.assertEqual(job.query, QUERY) diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index f8b70e0d44df..63bdea1a02fd 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -16,107 +16,6 @@ import unittest2 -class Test_ConfigurationProperty(unittest2.TestCase): - - def _getTargetClass(self): - from gcloud.bigquery.job import _ConfigurationProperty - return _ConfigurationProperty - - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) - - def test_it(self): - - class Configuration(object): - _attr = None - - class Wrapper(object): - attr = self._makeOne('attr') - - def __init__(self): - self._configuration = Configuration() - - self.assertEqual(Wrapper.attr.name, 'attr') - - wrapper = Wrapper() - self.assertEqual(wrapper.attr, None) - - value = object() - wrapper.attr = value - self.assertTrue(wrapper.attr is value) - self.assertTrue(wrapper._configuration._attr is value) - - del wrapper.attr - self.assertEqual(wrapper.attr, None) - self.assertEqual(wrapper._configuration._attr, None) - - -class Test_TypedProperty(unittest2.TestCase): - - def _getTargetClass(self): - from gcloud.bigquery.job import _TypedProperty - return _TypedProperty - - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) - - def test_it(self): - - class Configuration(object): - _attr = None - - class Wrapper(object): - attr = self._makeOne('attr', int) - - def __init__(self): - self._configuration = Configuration() - - wrapper = Wrapper() - with self.assertRaises(ValueError): - wrapper.attr = 'BOGUS' - - wrapper.attr = 42 - self.assertEqual(wrapper.attr, 42) - self.assertEqual(wrapper._configuration._attr, 42) - - del wrapper.attr - self.assertEqual(wrapper.attr, None) - self.assertEqual(wrapper._configuration._attr, None) - - -class Test_EnumProperty(unittest2.TestCase): - - def _getTargetClass(self): - from gcloud.bigquery.job import _EnumProperty - return _EnumProperty - - def test_it(self): - - class Sub(self._getTargetClass()): - ALLOWED = ('FOO', 'BAR', 'BAZ') - - class Configuration(object): - _attr = None - - class Wrapper(object): - attr = Sub('attr') - - def __init__(self): - self._configuration = Configuration() - - wrapper = Wrapper() - with self.assertRaises(ValueError): - wrapper.attr = 'BOGUS' - - wrapper.attr = 'FOO' - self.assertEqual(wrapper.attr, 'FOO') - self.assertEqual(wrapper._configuration._attr, 'FOO') - - del wrapper.attr - self.assertEqual(wrapper.attr, None) - self.assertEqual(wrapper._configuration._attr, None) - - class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' @@ -1087,13 +986,13 @@ def test_reload_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) -class TestRunAsyncQueryJob(unittest2.TestCase, _Base): +class TestQueryJob(unittest2.TestCase, _Base): JOB_TYPE = 'query' QUERY = 'select count(*) from persons' def _getTargetClass(self): - from gcloud.bigquery.job import RunAsyncQueryJob - return RunAsyncQueryJob + from gcloud.bigquery.job import QueryJob + return QueryJob def _verifyBooleanResourceProperties(self, job, config): @@ -1353,281 +1252,6 @@ def test_reload_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) -class TestRunSyncQueryJob(unittest2.TestCase, _Base): - JOB_NAME = 'test-synchronous-query' - JOB_TYPE = 'query' - QUERY = 'select count(*) from persons' - TOKEN = 'TOKEN' - - def _getTargetClass(self): - from gcloud.bigquery.job import RunSyncQueryJob - return RunSyncQueryJob - - def _makeResource(self, complete=False): - resource = { - 'jobReference': { - 'projectId': self.PROJECT, - 'jobId': self.JOB_NAME, - }, - 'jobComplete': complete, - 'errors': [], - 'schema': { - 'fields': [ - {'name': 'full_name', 'type': 'STRING', 'mode': 'REQURED'}, - {'name': 'age', 'type': 'INTEGER', 'mode': 'REQURED'}, - ], - }, - } - - if complete: - resource['totalRows'] = 1000 - resource['rows'] = [ - {'f': [ - {'v': 'Phred Phlyntstone'}, - {'v': 32}, - ]}, - {'f': [ - {'v': 'Bharney Rhubble'}, - {'v': 33}, - ]}, - {'f': [ - {'v': 'Wylma Phlyntstone'}, - {'v': 29}, - ]}, - {'f': [ - {'v': 'Bhettye Rhubble'}, - {'v': 27}, - ]}, - ] - resource['pageToken'] = self.TOKEN - resource['totalBytesProcessed'] = 100000 - resource['cacheHit'] = False - - return resource - - def _verifySchema(self, job, resource): - from gcloud.bigquery.table import SchemaField - if 'schema' in resource: - fields = resource['schema']['fields'] - self.assertEqual(len(job.schema), len(fields)) - for found, expected in zip(job.schema, fields): - self.assertTrue(isinstance(found, SchemaField)) - self.assertEqual(found.name, expected['name']) - self.assertEqual(found.field_type, expected['type']) - self.assertEqual(found.mode, expected['mode']) - self.assertEqual(found.description, - expected.get('description')) - self.assertEqual(found.fields, expected.get('fields')) - else: - self.assertTrue(job.schema is None) - - def _verifyRows(self, job, resource): - expected = resource.get('rows') - if expected is None: - self.assertEqual(job.rows, []) - else: - found = job.rows - self.assertEqual(len(found), len(expected)) - for f_row, e_row in zip(found, expected): - self.assertEqual(f_row, - tuple([cell['v'] for cell in e_row['f']])) - - def _verifyResourceProperties(self, job, resource): - self.assertEqual(job.cache_hit, resource.get('cacheHit')) - self.assertEqual(job.complete, resource.get('jobComplete')) - self.assertEqual(job.errors, resource.get('errors')) - self.assertEqual(job.page_token, resource.get('pageToken')) - self.assertEqual(job.total_rows, resource.get('totalRows')) - self.assertEqual(job.total_bytes_processed, - resource.get('totalBytesProcessed')) - - if 'jobReference' in resource: - self.assertEqual(job.name, resource['jobReference']['jobId']) - else: - self.assertTrue(job.name is None) - - self._verifySchema(job, resource) - self._verifyRows(job, resource) - - def test_ctor(self): - client = _Client(self.PROJECT) - job = self._makeOne(self.QUERY, client) - self.assertEqual(job.query, self.QUERY) - self.assertTrue(job._client is client) - - self.assertTrue(job.cache_hit is None) - self.assertTrue(job.complete is None) - self.assertTrue(job.errors is None) - self.assertTrue(job.name is None) - self.assertTrue(job.page_token is None) - self.assertEqual(job.rows, []) - self.assertTrue(job.schema is None) - self.assertTrue(job.total_rows is None) - self.assertTrue(job.total_bytes_processed is None) - - self.assertTrue(job.default_dataset is None) - self.assertTrue(job.max_results is None) - self.assertTrue(job.preserve_nulls is None) - self.assertTrue(job.use_query_cache is None) - - def test_name_setter_bad_value(self): - client = _Client(self.PROJECT) - job = self._makeOne(self.QUERY, client) - with self.assertRaises(ValueError): - job.name = 12345 - - def test_name_setter(self): - client = _Client(self.PROJECT) - job = self._makeOne(self.QUERY, client) - job.name = 'NAME' - self.assertEqual(job.name, 'NAME') - - def test_schema(self): - client = _Client(self.PROJECT) - job = self._makeOne(self.QUERY, client) - self._verifyResourceProperties(job, {}) - resource = { - 'schema': { - 'fields': [ - {'name': 'full_name', 'type': 'STRING', 'mode': 'REQURED'}, - {'name': 'age', 'type': 'INTEGER', 'mode': 'REQURED'}, - ], - }, - } - job._set_properties(resource) - self._verifyResourceProperties(job, resource) - - def test_run_w_bound_client(self): - PATH = 'projects/%s/queries' % self.PROJECT - RESOURCE = self._makeResource(complete=False) - conn = _Connection(RESOURCE) - client = _Client(project=self.PROJECT, connection=conn) - job = self._makeOne(self.QUERY, client) - - job.run() - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - SENT = {'query': self.QUERY} - self.assertEqual(req['data'], SENT) - self._verifyResourceProperties(job, RESOURCE) - - def test_run_w_alternate_client(self): - PATH = 'projects/%s/queries' % self.PROJECT - RESOURCE = self._makeResource(complete=True) - DATASET = 'test_dataset' - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection(RESOURCE) - client2 = _Client(project=self.PROJECT, connection=conn2) - job = self._makeOne(self.QUERY, client1) - - job.default_dataset = client2.dataset(DATASET) - job.max_results = 100 - job.preserve_nulls = True - job.timeout_ms = 20000 - job.use_query_cache = False - - job.run(client=client2) - - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - SENT = { - 'query': self.QUERY, - 'defaultDataset': { - 'projectId': self.PROJECT, - 'datasetId': DATASET, - }, - 'maxResults': 100, - 'preserveNulls': True, - 'timeoutMs': 20000, - 'useQueryCache': False, - } - self.assertEqual(req['data'], SENT) - self._verifyResourceProperties(job, RESOURCE) - - def test_fetch_data_query_not_yet_run(self): - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) - job = self._makeOne(self.QUERY, client) - self.assertRaises(ValueError, job.fetch_data) - - def test_fetch_data_w_bound_client(self): - PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) - BEFORE = self._makeResource(complete=False) - AFTER = self._makeResource(complete=True) - - conn = _Connection(AFTER) - client = _Client(project=self.PROJECT, connection=conn) - job = self._makeOne(self.QUERY, client) - job._set_properties(BEFORE) - self.assertFalse(job.complete) - - rows, total_rows, page_token = job.fetch_data() - - self.assertTrue(job.complete) - self.assertEqual(len(rows), 4) - self.assertEqual(rows[0], ('Phred Phlyntstone', 32)) - self.assertEqual(rows[1], ('Bharney Rhubble', 33)) - self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) - self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) - self.assertEqual(total_rows, AFTER['totalRows']) - self.assertEqual(page_token, AFTER['pageToken']) - - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) - - def test_fetch_data_w_alternate_client(self): - PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) - MAX = 10 - TOKEN = 'TOKEN' - START = 2257 - TIMEOUT = 20000 - BEFORE = self._makeResource(complete=False) - AFTER = self._makeResource(complete=True) - - conn1 = _Connection() - client1 = _Client(project=self.PROJECT, connection=conn1) - conn2 = _Connection(AFTER) - client2 = _Client(project=self.PROJECT, connection=conn2) - job = self._makeOne(self.QUERY, client1) - job._set_properties(BEFORE) - self.assertFalse(job.complete) - - rows, total_rows, page_token = job.fetch_data(client=client2, - max_results=MAX, - page_token=TOKEN, - start_index=START, - timeout_ms=TIMEOUT) - - self.assertTrue(job.complete) - self.assertEqual(len(rows), 4) - self.assertEqual(rows[0], ('Phred Phlyntstone', 32)) - self.assertEqual(rows[1], ('Bharney Rhubble', 33)) - self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) - self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) - self.assertEqual(total_rows, AFTER['totalRows']) - self.assertEqual(page_token, AFTER['pageToken']) - - self.assertEqual(len(conn1._requested), 0) - self.assertEqual(len(conn2._requested), 1) - req = conn2._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(req['query_params'], - {'maxResults': MAX, - 'pageToken': TOKEN, - 'startIndex': START, - 'timeoutMs': TIMEOUT}) - - class _Client(object): def __init__(self, project='project', connection=None): diff --git a/gcloud/bigquery/test_query.py b/gcloud/bigquery/test_query.py new file mode 100644 index 000000000000..b1d323e80bed --- /dev/null +++ b/gcloud/bigquery/test_query.py @@ -0,0 +1,330 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestQueryResults(unittest2.TestCase): + PROJECT = 'project' + JOB_NAME = 'job_name' + JOB_NAME = 'test-synchronous-query' + JOB_TYPE = 'query' + QUERY = 'select count(*) from persons' + TOKEN = 'TOKEN' + + def _getTargetClass(self): + from gcloud.bigquery.query import QueryResults + return QueryResults + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def _makeResource(self, complete=False): + resource = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'jobComplete': complete, + 'errors': [], + 'schema': { + 'fields': [ + {'name': 'full_name', 'type': 'STRING', 'mode': 'REQURED'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'REQURED'}, + ], + }, + } + + if complete: + resource['totalRows'] = 1000 + resource['rows'] = [ + {'f': [ + {'v': 'Phred Phlyntstone'}, + {'v': 32}, + ]}, + {'f': [ + {'v': 'Bharney Rhubble'}, + {'v': 33}, + ]}, + {'f': [ + {'v': 'Wylma Phlyntstone'}, + {'v': 29}, + ]}, + {'f': [ + {'v': 'Bhettye Rhubble'}, + {'v': 27}, + ]}, + ] + resource['pageToken'] = self.TOKEN + resource['totalBytesProcessed'] = 100000 + resource['cacheHit'] = False + + return resource + + def _verifySchema(self, query, resource): + from gcloud.bigquery.table import SchemaField + if 'schema' in resource: + fields = resource['schema']['fields'] + self.assertEqual(len(query.schema), len(fields)) + for found, expected in zip(query.schema, fields): + self.assertTrue(isinstance(found, SchemaField)) + self.assertEqual(found.name, expected['name']) + self.assertEqual(found.field_type, expected['type']) + self.assertEqual(found.mode, expected['mode']) + self.assertEqual(found.description, + expected.get('description')) + self.assertEqual(found.fields, expected.get('fields')) + else: + self.assertTrue(query.schema is None) + + def _verifyRows(self, query, resource): + expected = resource.get('rows') + if expected is None: + self.assertEqual(query.rows, []) + else: + found = query.rows + self.assertEqual(len(found), len(expected)) + for f_row, e_row in zip(found, expected): + self.assertEqual(f_row, + tuple([cell['v'] for cell in e_row['f']])) + + def _verifyResourceProperties(self, query, resource): + self.assertEqual(query.cache_hit, resource.get('cacheHit')) + self.assertEqual(query.complete, resource.get('jobComplete')) + self.assertEqual(query.errors, resource.get('errors')) + self.assertEqual(query.page_token, resource.get('pageToken')) + self.assertEqual(query.total_rows, resource.get('totalRows')) + self.assertEqual(query.total_bytes_processed, + resource.get('totalBytesProcessed')) + + if 'jobReference' in resource: + self.assertEqual(query.name, resource['jobReference']['jobId']) + else: + self.assertTrue(query.name is None) + + self._verifySchema(query, resource) + self._verifyRows(query, resource) + + def test_ctor(self): + client = _Client(self.PROJECT) + query = self._makeOne(self.QUERY, client) + self.assertEqual(query.query, self.QUERY) + self.assertTrue(query._client is client) + + self.assertTrue(query.cache_hit is None) + self.assertTrue(query.complete is None) + self.assertTrue(query.errors is None) + self.assertTrue(query.name is None) + self.assertTrue(query.page_token is None) + self.assertEqual(query.rows, []) + self.assertTrue(query.schema is None) + self.assertTrue(query.total_rows is None) + self.assertTrue(query.total_bytes_processed is None) + + self.assertTrue(query.default_dataset is None) + self.assertTrue(query.max_results is None) + self.assertTrue(query.preserve_nulls is None) + self.assertTrue(query.use_query_cache is None) + + def test_job_wo_jobid(self): + client = _Client(self.PROJECT) + query = self._makeOne(self.QUERY, client) + self.assertTrue(query.job is None) + + def test_job_w_jobid(self): + from gcloud.bigquery.job import QueryJob + SERVER_GENERATED = 'SERVER_GENERATED' + client = _Client(self.PROJECT) + query = self._makeOne(self.QUERY, client) + query._properties['jobReference'] = { + 'projectId': self.PROJECT, + 'jobId': SERVER_GENERATED, + } + job = query.job + self.assertTrue(isinstance(job, QueryJob)) + self.assertEqual(job.query, self.QUERY) + self.assertTrue(job._client is client) + self.assertEqual(job.name, SERVER_GENERATED) + + def test_schema(self): + client = _Client(self.PROJECT) + query = self._makeOne(self.QUERY, client) + self._verifyResourceProperties(query, {}) + resource = { + 'schema': { + 'fields': [ + {'name': 'full_name', 'type': 'STRING', 'mode': 'REQURED'}, + {'name': 'age', 'type': 'INTEGER', 'mode': 'REQURED'}, + ], + }, + } + query._set_properties(resource) + self._verifyResourceProperties(query, resource) + + def test_run_w_bound_client(self): + PATH = 'projects/%s/queries' % self.PROJECT + RESOURCE = self._makeResource(complete=False) + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + query = self._makeOne(self.QUERY, client) + + query.run() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = {'query': self.QUERY} + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(query, RESOURCE) + + def test_run_w_alternate_client(self): + PATH = 'projects/%s/queries' % self.PROJECT + RESOURCE = self._makeResource(complete=True) + DATASET = 'test_dataset' + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + query = self._makeOne(self.QUERY, client1) + + query.default_dataset = client2.dataset(DATASET) + query.max_results = 100 + query.preserve_nulls = True + query.timeout_ms = 20000 + query.use_query_cache = False + + query.run(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'query': self.QUERY, + 'defaultDataset': { + 'projectId': self.PROJECT, + 'datasetId': DATASET, + }, + 'maxResults': 100, + 'preserveNulls': True, + 'timeoutMs': 20000, + 'useQueryCache': False, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(query, RESOURCE) + + def test_fetch_data_query_not_yet_run(self): + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + query = self._makeOne(self.QUERY, client) + self.assertRaises(ValueError, query.fetch_data) + + def test_fetch_data_w_bound_client(self): + PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) + BEFORE = self._makeResource(complete=False) + AFTER = self._makeResource(complete=True) + + conn = _Connection(AFTER) + client = _Client(project=self.PROJECT, connection=conn) + query = self._makeOne(self.QUERY, client) + query._set_properties(BEFORE) + self.assertFalse(query.complete) + + rows, total_rows, page_token = query.fetch_data() + + self.assertTrue(query.complete) + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0], ('Phred Phlyntstone', 32)) + self.assertEqual(rows[1], ('Bharney Rhubble', 33)) + self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) + self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) + self.assertEqual(total_rows, AFTER['totalRows']) + self.assertEqual(page_token, AFTER['pageToken']) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + + def test_fetch_data_w_alternate_client(self): + PATH = 'projects/%s/queries/%s' % (self.PROJECT, self.JOB_NAME) + MAX = 10 + TOKEN = 'TOKEN' + START = 2257 + TIMEOUT = 20000 + BEFORE = self._makeResource(complete=False) + AFTER = self._makeResource(complete=True) + + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(AFTER) + client2 = _Client(project=self.PROJECT, connection=conn2) + query = self._makeOne(self.QUERY, client1) + query._set_properties(BEFORE) + self.assertFalse(query.complete) + + rows, total_rows, page_token = query.fetch_data( + client=client2, max_results=MAX, page_token=TOKEN, + start_index=START, timeout_ms=TIMEOUT) + + self.assertTrue(query.complete) + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0], ('Phred Phlyntstone', 32)) + self.assertEqual(rows[1], ('Bharney Rhubble', 33)) + self.assertEqual(rows[2], ('Wylma Phlyntstone', 29)) + self.assertEqual(rows[3], ('Bhettye Rhubble', 27)) + self.assertEqual(total_rows, AFTER['totalRows']) + self.assertEqual(page_token, AFTER['pageToken']) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], + {'maxResults': MAX, + 'pageToken': TOKEN, + 'startIndex': START, + 'timeoutMs': TIMEOUT}) + + +class _Client(object): + + def __init__(self, project='project', connection=None): + self.project = project + self.connection = connection + + def dataset(self, name): + from gcloud.bigquery.dataset import Dataset + return Dataset(name, client=self) + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + from gcloud.exceptions import NotFound + self._requested.append(kw) + + try: + response, self._responses = self._responses[0], self._responses[1:] + except: # pragma: NO COVER + raise NotFound('miss') + else: + return response