Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job for running queries #1072

Merged
merged 7 commits into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from gcloud.bigquery.job import CopyJob
from gcloud.bigquery.job import ExtractTableToStorageJob
from gcloud.bigquery.job import LoadTableFromStorageJob
from gcloud.bigquery.job import RunQueryJob


class Client(JSONClient):
Expand Down Expand Up @@ -153,3 +154,17 @@ def extract_table_to_storage(self, name, source, *destination_uris):
"""
return ExtractTableToStorageJob(name, source, destination_uris,
client=self)

def run_query(self, name, query):
"""Construct a job for running a SQL query.

:type name: string
:param name: Name of the job.

:type query: string
:param query: SQL query to be executed

:rtype: :class:`gcloud.bigquery.job.RunQueryJob`
:returns: a new ``RunQueryJob`` instance
"""
return RunQueryJob(name, query, client=self)
152 changes: 149 additions & 3 deletions gcloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

from gcloud.exceptions import NotFound
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigquery.dataset import Dataset
from gcloud.bigquery.table import SchemaField
from gcloud.bigquery.table import Table
from gcloud.bigquery.table import _build_schema_resource
from gcloud.bigquery.table import _parse_schema_resource

Expand Down Expand Up @@ -122,6 +124,13 @@ class Encoding(_EnumProperty):
ALLOWED = (UTF_8, ISO_8559_1)


class QueryPriority(_EnumProperty):
"""Pseudo-enum for ``RunQueryJob.priority`` property."""
INTERACTIVE = 'INTERACTIVE'
BATCH = 'BATCH'
ALLOWED = (INTERACTIVE, BATCH)


class SourceFormat(_EnumProperty):
"""Pseudo-enum for ``source_format`` properties."""
CSV = 'CSV'
Expand Down Expand Up @@ -403,7 +412,7 @@ class _LoadConfiguration(object):


class LoadTableFromStorageJob(_BaseJob):
"""Asynchronous job for loading data into a BQ table from CloudStorage.
"""Asynchronous job for loading data into a table from CloudStorage.

:type name: string
:param name: the name of the job
Expand Down Expand Up @@ -616,7 +625,7 @@ class _CopyConfiguration(object):


class CopyJob(_BaseJob):
"""Asynchronous job: copy data into a BQ table from other tables.
"""Asynchronous job: copy data into a table from other tables.

:type name: string
:param name: the name of the job
Expand Down Expand Up @@ -695,7 +704,7 @@ class _ExtractConfiguration(object):


class ExtractTableToStorageJob(_BaseJob):
"""Asynchronous job: extract data from a BQ table into Cloud Storage.
"""Asynchronous job: extract data from a table into Cloud Storage.

:type name: string
:param name: the name of the job
Expand Down Expand Up @@ -773,3 +782,140 @@ def _build_resource(self):
self._populate_config_resource(configuration)

return resource


class _QueryConfiguration(object):
"""User-settable configuration options for query jobs."""
# None -> use server default.
_allow_large_results = None
_create_disposition = None
_default_dataset = None
_destination_table = None
_flatten_results = None
_priority = None
_use_query_cache = None
_write_disposition = None


class RunQueryJob(_BaseJob):
"""Asynchronous job: query tables.

:type name: string
:param name: the name of the job

:type query: string
:param query: SQL query string

:type client: :class:`gcloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, query, client):
super(RunQueryJob, self).__init__(name, client)
self.query = query
self._configuration = _QueryConfiguration()

allow_large_results = _TypedProperty('allow_large_results', bool)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults
"""

create_disposition = CreateDisposition('create_disposition')
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition
"""

default_dataset = _TypedProperty('default_dataset', Dataset)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.default_dataset
"""

destination_table = _TypedProperty('destination_table', Table)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable
"""

flatten_results = _TypedProperty('flatten_results', bool)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.flattenResults
"""

priority = QueryPriority('priority')
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority
"""

use_query_cache = _TypedProperty('use_query_cache', bool)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.useQueryCache
"""

write_disposition = WriteDisposition('write_disposition')
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition
"""

def _destination_table_resource(self):
if self.destination_table is not None:
return {
'projectId': self.destination_table.project,
'datasetId': self.destination_table.dataset_name,
'tableId': self.destination_table.name,
}

def _populate_config_resource(self, configuration):
"""Helper for _build_resource: copy config properties to resource"""
if self.allow_large_results is not None:
configuration['allowLargeResults'] = self.allow_large_results
if self.create_disposition is not None:
configuration['createDisposition'] = self.create_disposition
if self.default_dataset is not None:
configuration['defaultDataset'] = {
'projectId': self.default_dataset.project,
'datasetId': self.default_dataset.name,
}
if self.destination_table is not None:
table_res = self._destination_table_resource()
configuration['destinationTable'] = table_res
if self.flatten_results is not None:
configuration['flattenResults'] = self.flatten_results
if self.priority is not None:
configuration['priority'] = self.priority
if self.use_query_cache is not None:
configuration['useQueryCache'] = self.use_query_cache
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""

resource = {
'jobReference': {
'projectId': self.project,
'jobId': self.name,
},
'configuration': {
'query': {
'query': self.query,
},
},
}
configuration = resource['configuration']['query']
self._populate_config_resource(configuration)

return resource

def _scrub_local_properties(self, cleaned):
"""Helper: handle subclass properties in cleaned."""
configuration = cleaned['configuration']['query']
dest_remote = configuration.get('destinationTable')

if dest_remote is None:
if self.destination_table is not None:
del self.destination_table
else:
dest_local = self._destination_table_resource()
if dest_remote != dest_local:
assert dest_remote['projectId'] == self.project
dataset = self._client.dataset(dest_remote['datasetId'])
self.destination_table = dataset.table(dest_remote['tableId'])
14 changes: 14 additions & 0 deletions gcloud/bigquery/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ def test_extract_table_to_storage(self):
self.assertEqual(job.source, source)
self.assertEqual(list(job.destination_uris), [DESTINATION])

def test_run_query(self):
from gcloud.bigquery.job import RunQueryJob
PROJECT = 'PROJECT'
JOB = 'job_name'
QUERY = 'select count(*) from persons'
creds = _Credentials()
http = object()
client = self._makeOne(project=PROJECT, credentials=creds, http=http)
job = client.run_query(JOB, QUERY)
self.assertTrue(isinstance(job, RunQueryJob))
self.assertTrue(job._client is client)
self.assertEqual(job.name, JOB)
self.assertEqual(job.query, QUERY)


class _Credentials(object):

Expand Down
Loading