Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewriting bigquery samples to use proper argparse and main execution #89

Merged
merged 6 commits into from
Sep 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ This repository holds the samples used in the python documentation on [cloud.goo
For more detailed introduction to a product, check the README in the
corresponding folder.

## Running the samples

Most samples must be run as modules instead of directly, for example:

```
$ python -m bigquery.samples.async_query [your-project-id] [your-query]
```

Refer to the README in the corresponding folder for any special instructions.

## Contributing changes

* See [CONTRIBUTING.md](CONTRIBUTING.md)
Expand Down
4 changes: 0 additions & 4 deletions appengine/ndb/transactions/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,3 @@ def add_note():
return ('Already there<br><a href="%s">Return</a>'
% flask.url_for('main_page', page_name=page_name))
return flask.redirect(flask.url_for('main_page', page_name=page_name))


if __name__ == '__main__':
app.run()
8 changes: 8 additions & 0 deletions bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

This section contains samples for [Google BigQuery](https://cloud.google.com/bigquery).

## Running the samples

These samples must be run as modules, for example:

```
$ python -m bigquery.samples.async_query [your-project-id] [your-query]
```

## Other Samples

* [Using BigQuery from Google App Engine](../appengine/bigquery).
125 changes: 86 additions & 39 deletions bigquery/samples/async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import json
import time
import uuid

from bigquery.samples.utils import get_service
from bigquery.samples.utils import paging
from bigquery.samples.utils import poll_job
from six.moves import input
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials


# [START async_query]
def async_query(service, project_id, query, batch=False, num_retries=5):
def async_query(bigquery, project_id, query, batch=False, num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate query
job_data = {
Expand All @@ -36,48 +36,95 @@ def async_query(service, project_id, query, batch=False, num_retries=5):
}
}
}
return service.jobs().insert(
return bigquery.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END async_query]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START run]
def run(project_id, query_string, batch, num_retries, interval):
service = get_service()

query_job = async_query(service,
project_id,
query_string,
batch,
num_retries)

poll_job(service,
query_job['jobReference']['projectId'],
query_job['jobReference']['jobId'],
interval,
num_retries)

for page in paging(service,
service.jobs().getQueryResults,
num_retries=num_retries,
**query_job['jobReference']):

yield json.dumps(page['rows'])
def main(project_id, query_string, batch, num_retries, interval):
# [START build_service]
# Grab the application's default credentials from the environment.
credentials = GoogleCredentials.get_application_default()

# Construct the service object for interacting with the BigQuery API.
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
# [END build_service]

# Submit the job and wait for it to complete.
query_job = async_query(
bigquery,
project_id,
query_string,
batch,
num_retries)

poll_job(bigquery, query_job)

# Page through the result set and print all results.
page_token = None
while True:
page = bigquery.jobs().getQueryResults(
pageToken=page_token,
**query_job['jobReference']).execute(num_retries=2)

print(json.dumps(page['rows']))

page_token = page.get('pageToken')
if not page_token:
break
# [END run]


# [START main]
def main():
project_id = input("Enter the project ID: ")
query_string = input("Enter the Bigquery SQL Query: ")
batch = input("Run query as batch (y/n)?: ") in (
'True', 'true', 'y', 'Y', 'yes', 'Yes')
num_retries = int(input(
"Enter number of times to retry in case of 500 error: "))
interval = input(
"Enter how often to poll the query for completion (seconds): ")

for result in run(project_id, query_string, batch, num_retries, interval):
print(result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Loads data into BigQuery.')
parser.add_argument('project_id', help='Your Google Cloud project ID.')
parser.add_argument('query', help='BigQuery SQL Query.')
parser.add_argument(
'-b', '--batch', help='Run query in batch mode.', action='store_true')
parser.add_argument(
'-r', '--num_retries',
help='Number of times to retry in case of 500 error.',
type=int,
default=5)
parser.add_argument(
'-p', '--poll_interval',
help='How often to poll the query for completion (seconds).',
type=int,
default=1)

args = parser.parse_args()

main(
args.project_id,
args.query,
args.batch,
args.num_retries,
args.poll_interval)
# [END main]
127 changes: 85 additions & 42 deletions bigquery/samples/export_data_to_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import time
import uuid

from bigquery.samples.utils import get_service
from bigquery.samples.utils import poll_job
from six.moves import input
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials


# [START export_table]
def export_table(service, cloud_storage_path,
projectId, datasetId, tableId,
def export_table(bigquery, cloud_storage_path,
project_id, dataset_id, table_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to give my thinking behind this. Camel case matches the field names in the json objects. So you could make these kwargs and then do something like:

tableRef = fetch_some_bigquery_object()['tableReference']
export_table(service, cloud_storage, **tableRef)

Probably not worth inconsistent/camel case naming, but something to consider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I'd rather follow python convention where possible:

Names that are visible to the user as public parts of the API should follow conventions that reflect usage rather than implementation.
...
mixedCase is allowed only in contexts where that's already the prevailing style (e.g. threading.py), to retain backwards compatibility.

export_format="CSV",
num_retries=5):
"""
Starts an export job

Args:
service: initialized and authorized bigquery
bigquery: initialized and authorized bigquery
google-api-client object.
cloud_storage_path: fully qualified
path to a Google Cloud Storage location.
Expand All @@ -42,60 +43,102 @@ def export_table(service, cloud_storage_path,
# don't accidentally duplicate export
job_data = {
'jobReference': {
'projectId': projectId,
'projectId': project_id,
'jobId': str(uuid.uuid4())
},
'configuration': {
'extract': {
'sourceTable': {
'projectId': projectId,
'datasetId': datasetId,
'tableId': tableId,
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id,
},
'destinationUris': [cloud_storage_path],
'destinationFormat': export_format
}
}
}
return service.jobs().insert(
projectId=projectId,
return bigquery.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END export_table]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START run]
def run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval, export_format="CSV"):

bigquery = get_service()
resource = export_table(bigquery, cloud_storage_path,
projectId, datasetId, tableId,
num_retries=num_retries,
export_format=export_format)
poll_job(bigquery,
resource['jobReference']['projectId'],
resource['jobReference']['jobId'],
interval,
num_retries)
def main(cloud_storage_path, project_id, dataset_id, table_id,
num_retries, interval, export_format="CSV"):
# [START build_service]
# Grab the application's default credentials from the environment.
credentials = GoogleCredentials.get_application_default()

# Construct the service object for interacting with the BigQuery API.
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
# [END build_service]

job = export_table(
bigquery,
cloud_storage_path,
project_id,
dataset_id,
table_id,
num_retries=num_retries,
export_format=export_format)
poll_job(bigquery, job)
# [END run]


# [START main]
def main():
projectId = input("Enter the project ID: ")
datasetId = input("Enter a dataset ID: ")
tableId = input("Enter a table name to copy: ")
cloud_storage_path = input(
"Enter a Google Cloud Storage URI: ")
interval = input(
"Enter how often to poll the job (in seconds): ")
num_retries = input(
"Enter the number of retries in case of 500 error: ")

run(cloud_storage_path,
projectId, datasetId, tableId,
num_retries, interval)

print('Done exporting!')
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Exports data from BigQuery to Google Cloud Storage.')
parser.add_argument('project_id', help='Your Google Cloud project ID.')
parser.add_argument('dataset_id', help='BigQuery dataset to export.')
parser.add_argument('table_id', help='BigQuery table to export.')
parser.add_argument(
'gcs_path',
help=('Google Cloud Storage path to store the exported data. For '
'example, gs://mybucket/mydata.csv'))
parser.add_argument(
'-p', '--poll_interval',
help='How often to poll the query for completion (seconds).',
type=int,
default=1)
parser.add_argument(
'-r', '--num_retries',
help='Number of times to retry in case of 500 error.',
type=int,
default=5)

args = parser.parse_args()

main(
args.gcs_path,
args.project_id,
args.dataset_id,
args.table_id,
args.num_retries,
args.poll_interval)
# [END main]
Loading