Skip to content

Commit

Permalink
Removing utils, putting depending code directly in samples
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Wayne Parrott committed Sep 15, 2015
1 parent 7b5f96c commit e773167
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 126 deletions.
71 changes: 53 additions & 18 deletions bigquery/samples/async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
#
import argparse
import json
import time
import uuid

from .utils import get_service, paging, poll_job
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials


# [START async_query]
def async_query(service, project_id, query, batch=False, num_retries=5):
def async_query(bigquery, project_id, query, batch=False, num_retries=5):
# Generate a unique job_id so retries
# don't accidentally duplicate query
job_data = {
Expand All @@ -34,34 +36,67 @@ def async_query(service, project_id, query, batch=False, num_retries=5):
}
}
}
return service.jobs().insert(
return bigquery.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END async_query]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START run]
def main(project_id, query_string, batch, num_retries, interval):
service = get_service()
# [START build_service]
# Grab the application's default credentials from the environment.
credentials = GoogleCredentials.get_application_default()

query_job = async_query(service,
project_id,
query_string,
batch,
num_retries)
# Construct the service object for interacting with the BigQuery API.
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
# [END build_service]

poll_job(service,
query_job['jobReference']['projectId'],
query_job['jobReference']['jobId'],
interval,
num_retries)
# Submit the job and wait for it to complete.
query_job = async_query(
bigquery,
project_id,
query_string,
batch,
num_retries)

for page in paging(service,
service.jobs().getQueryResults,
num_retries=num_retries,
**query_job['jobReference']):
poll_job(bigquery, query_job)

# Page through the result set and print all results.
page_token = None
while True:
page = bigquery.jobs().getQueryResults(
pageToken=page_token,
**query_job['jobReference']).execute(num_retries=2)

print(json.dumps(page['rows']))

page_token = page.get('pageToken')
if not page_token:
break
# [END run]


Expand Down
59 changes: 45 additions & 14 deletions bigquery/samples/export_data_to_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@
# limitations under the License.
#
import argparse
import time
import uuid

from .utils import get_service, poll_job
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials


# [START export_table]
def export_table(service, cloud_storage_path,
def export_table(bigquery, cloud_storage_path,
project_id, dataset_id, table_id,
export_format="CSV",
num_retries=5):
"""
Starts an export job
Args:
service: initialized and authorized bigquery
bigquery: initialized and authorized bigquery
google-api-client object.
cloud_storage_path: fully qualified
path to a Google Cloud Storage location.
Expand Down Expand Up @@ -56,26 +58,55 @@ def export_table(service, cloud_storage_path,
}
}
}
return service.jobs().insert(
return bigquery.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END export_table]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START run]
def main(cloud_storage_path, project_id, dataset_id, table_id,
num_retries, interval, export_format="CSV"):
# [START build_service]
# Grab the application's default credentials from the environment.
credentials = GoogleCredentials.get_application_default()

# Construct the service object for interacting with the BigQuery API.
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
# [END build_service]

bigquery = get_service()
resource = export_table(bigquery, cloud_storage_path,
project_id, dataset_id, table_id,
num_retries=num_retries,
export_format=export_format)
poll_job(bigquery,
resource['jobReference']['projectId'],
resource['jobReference']['jobId'],
interval,
num_retries)
job = export_table(
bigquery,
cloud_storage_path,
project_id,
dataset_id,
table_id,
num_retries=num_retries,
export_format=export_format)
poll_job(bigquery, job)
# [END run]


Expand Down
2 changes: 2 additions & 0 deletions bigquery/samples/list_datasets_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def list_datasets(service, project):

except HTTPError as err:
print('Error in list_datasets: %s' % err.content)
raise err
# [END list_datasets]


Expand All @@ -84,6 +85,7 @@ def list_projects(service):

except HTTPError as err:
print('Error in list_projects: %s' % err.content)
raise err
# [END list_projects]


Expand Down
42 changes: 33 additions & 9 deletions bigquery/samples/load_data_by_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
#
import argparse
import json
import time

from googleapiclient import discovery
import httplib2
from oauth2client.client import GoogleCredentials
from .utils import get_service, poll_job


# [START make_post]
def make_post(http, schema, data, projectId, datasetId, tableId):
def make_post(http, schema, data, project_id, dataset_id, table_id):
"""
Creates an http POST request for loading data into
a bigquery table
Expand All @@ -34,7 +35,7 @@ def make_post(http, schema, data, projectId, datasetId, tableId):
Returns: an http.request object
"""
url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' +
projectId + '/jobs')
project_id + '/jobs')
# Create the body of the request, separated by a boundary of xxx
resource = ('--xxx\n' +
'Content-Type: application/json; charset=UTF-8\n' + '\n' +
Expand All @@ -45,9 +46,9 @@ def make_post(http, schema, data, projectId, datasetId, tableId):
' "fields": ' + str(schema) + '\n' +
' },\n' +
' "destinationTable": {\n' +
' "projectId": "' + projectId + '",\n' +
' "datasetId": "' + datasetId + '",\n' +
' "tableId": "' + tableId + '"\n' +
' "projectId": "' + project_id + '",\n' +
' "datasetId": "' + dataset_id + '",\n' +
' "tableId": "' + table_id + '"\n' +
' }\n' +
' }\n' +
' }\n' +
Expand All @@ -70,10 +71,34 @@ def make_post(http, schema, data, projectId, datasetId, tableId):
# [END make_post]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START main]
def main(project_id, dataset_id, table_name, schema_path, data_path):
credentials = GoogleCredentials.get_application_default()
http = credentials.authorize(httplib2.Http())
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)

with open(schema_path, 'r') as schema_file:
schema = schema_file.read()
Expand All @@ -90,9 +115,8 @@ def main(project_id, dataset_id, table_name, schema_path, data_path):
table_name)

if resp.status == 200:
job_resource = json.loads(content)
service = get_service()
poll_job(service, **job_resource['jobReference'])
job = json.loads(content)
poll_job(bigquery, job)
print("Success!")
else:
print("Http error code: {}".format(resp.status))
Expand Down
49 changes: 38 additions & 11 deletions bigquery/samples/load_data_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@
#
import argparse
import json
import time
import uuid

from .utils import get_service, poll_job
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials


# [START load_table]
def load_table(service, project_id, dataset_id, table_name, source_schema,
def load_table(bigquery, project_id, dataset_id, table_name, source_schema,
source_path, num_retries=5):
"""
Starts a job to load a bigquery table from CSV
Args:
service: an initialized and authorized bigquery
bigquery: an initialized and authorized bigquery client
google-api-client object
source_schema: a valid bigquery schema,
see https://cloud.google.com/bigquery/docs/reference/v2/tables
Expand Down Expand Up @@ -58,34 +60,59 @@ def load_table(service, project_id, dataset_id, table_name, source_schema,
}
}

return service.jobs().insert(
return bigquery.jobs().insert(
projectId=project_id,
body=job_data).execute(num_retries=num_retries)
# [END load_table]


# [START poll_job]
def poll_job(bigquery, job):
"""Waits for a job to complete."""

print('Waiting for job to finish...')

request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])

while True:
result = request.execute(num_retries=2)

if result['status']['state'] == 'DONE':
if 'errorResult' in result['status']:
raise RuntimeError(result['status']['errorResult'])
print('Job complete.')
return

time.sleep(1)
# [END poll_job]


# [START run]
def main(project_id, dataset_id, table_name, schema_file, data_path,
poll_interval, num_retries):
service = get_service()
# [START build_service]
# Grab the application's default credentials from the environment.
credentials = GoogleCredentials.get_application_default()

# Construct the service object for interacting with the BigQuery API.
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
# [END build_service]

with open(schema_file, 'r') as f:
schema = json.load(f)

job = load_table(
service,
bigquery,
project_id,
dataset_id,
table_name,
schema,
data_path,
num_retries)

poll_job(service,
job['jobReference']['projectId'],
job['jobReference']['jobId'],
poll_interval,
num_retries)
poll_job(bigquery, job)
# [END run]


Expand Down
Loading

0 comments on commit e773167

Please sign in to comment.