Skip to content

Commit

Permalink
bigquery: modify CopyJob
Browse files Browse the repository at this point in the history
Update CopyJob and CopyJobConfig to conform to the new design
for jobs.
  • Loading branch information
jba committed Sep 25, 2017
1 parent 08141bd commit c322a5f
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 104 deletions.
2 changes: 2 additions & 0 deletions bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.dataset import AccessEntry
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand All @@ -42,6 +43,7 @@
'ArrayQueryParameter',
'Client',
'Dataset',
'CopyJobConfig',
'ExtractJobConfig',
'ScalarQueryParameter',
'SchemaField',
Expand Down
55 changes: 44 additions & 11 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import collections
import uuid

from google.api.core import page_iterator
Expand Down Expand Up @@ -492,25 +493,45 @@ def load_table_from_storage(self, job_id, destination, *source_uris):
"""
return LoadJob(job_id, destination, source_uris, client=self)

def copy_table(self, job_id, destination, *sources):
"""Construct a job for copying one or more tables into another table.
def copy_table(self, sources, destination, **kwargs):
"""Start a job for copying one or more tables into another table.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
:type job_id: str
:param job_id: Name of the job.
:type sources: One of:
:class:`~google.cloud.bigquery.table.TableReference`
sequence of
:class:`~google.cloud.bigquery.table.TableReference`
:param sources: Table or tables to be copied.
:type destination: :class:`google.cloud.bigquery.table.Table`
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be copied.
:type sources: sequence of :class:`google.cloud.bigquery.table.Table`
:param sources: tables to be copied.
:type kwargs: dict
:param kwargs: Additional keyword arguments.
:Keyword Arguments:
* *job_config*
(:class:`google.cloud.bigquery.job.CopyJobConfig`) --
(Optional) Extra configuration options for the copy job.
* *job_id* (``str``) --
Additional content
(Optional) The ID of the job.
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: a new ``CopyJob`` instance
"""
return CopyJob(job_id, destination, sources, client=self)
job_config = kwargs.get('job_config')
job_id = _make_job_id(kwargs.get('job_id'))

if not isinstance(sources, collections.Sequence):
sources = [sources]
job = CopyJob(job_id, sources, destination, client=self,
job_config=job_config)
job.begin()
return job

def extract_table(self, source, *destination_uris, **kwargs):
"""Start a job to extract a table into Cloud Storage files.
Expand Down Expand Up @@ -541,9 +562,7 @@ def extract_table(self, source, *destination_uris, **kwargs):
:returns: a new ``ExtractJob`` instance
"""
job_config = kwargs.get('job_config')
job_id = kwargs.get('job_id')
if job_id is None:
job_id = str(uuid.uuid4())
job_id = _make_job_id(kwargs.get('job_id'))

job = ExtractJob(
job_id, source, list(destination_uris), client=self,
Expand Down Expand Up @@ -667,3 +686,17 @@ def _item_to_table(iterator, resource):
:returns: The next table in the page.
"""
return Table.from_api_repr(resource, iterator.client)


def _make_job_id(job_id):
"""Construct an ID for a new job.
:type job_id: str or ``NoneType``
:param job_id: the user-provided job ID
:rtype: str
:returns: A job ID
"""
if job_id is None:
return str(uuid.uuid4())
return job_id
166 changes: 102 additions & 64 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Compression(_EnumApiResourceProperty):
NONE = 'NONE'


class CreateDisposition(_EnumProperty):
class CreateDisposition(_EnumApiResourceProperty):
"""Pseudo-enum for ``create_disposition`` properties."""
CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
CREATE_NEVER = 'CREATE_NEVER'
Expand Down Expand Up @@ -159,7 +159,7 @@ class SourceFormat(_EnumProperty):
AVRO = 'AVRO'


class WriteDisposition(_EnumProperty):
class WriteDisposition(_EnumApiResourceProperty):
"""Pseudo-enum for ``write_disposition`` properties."""
WRITE_APPEND = 'WRITE_APPEND'
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
Expand Down Expand Up @@ -688,7 +688,8 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
"""

create_disposition = CreateDisposition('create_disposition')
create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
"""
Expand Down Expand Up @@ -733,7 +734,8 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat
"""

write_disposition = WriteDisposition('write_disposition')
write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition
"""
Expand Down Expand Up @@ -853,13 +855,51 @@ def from_api_repr(cls, resource, client):
return job


class _CopyConfiguration(object):
"""User-settable configuration options for copy jobs.
class CopyJobConfig(object):
"""Configuration options for copy jobs.
Values which are ``None`` -> server defaults.
All properties in this class are optional. Values which are ``None`` ->
server defaults.
"""
_create_disposition = None
_write_disposition = None

def __init__(self):
self._properties = {}

create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
"""

write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
"""

def to_api_repr(self):
"""Build an API representation of the copy job config.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
"""
return copy.deepcopy(self._properties)

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
:type resource: dict
:param resource:
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
:returns: Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config


class CopyJob(_AsyncJob):
Expand All @@ -868,41 +908,45 @@ class CopyJob(_AsyncJob):
:type job_id: str
:param job_id: the job's ID, within the project belonging to ``client``.
:type destination: :class:`google.cloud.bigquery.table.Table`
:param destination: Table into which data is to be loaded.
:type sources: list of :class:`google.cloud.bigquery.table.Table`
:type sources: list of :class:`google.cloud.bigquery.table.TableReference`
:param sources: Table into which data is to be loaded.
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be loaded.
:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
:type job_config: :class:`~google.cloud.bigquery.job.CopyJobConfig`
:param job_config:
(Optional) Extra configuration options for the copy job.
"""
_JOB_TYPE = 'copy'

def __init__(self, job_id, destination, sources, client):
def __init__(self, job_id, sources, destination, client, job_config=None):
super(CopyJob, self).__init__(job_id, client)

if job_config is None:
job_config = CopyJobConfig()

self.destination = destination
self.sources = sources
self._configuration = _CopyConfiguration()

create_disposition = CreateDisposition('create_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
"""
self._configuration = job_config

write_disposition = WriteDisposition('write_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
"""
@property
def create_disposition(self):
"""See
:class:`~google.cloud.bigquery.job.CopyJobConfig.create_disposition`.
"""
return self._configuration.create_disposition

def _populate_config_resource(self, configuration):
"""Helper for _build_resource: copy config properties to resource"""
if self.create_disposition is not None:
configuration['createDisposition'] = self.create_disposition
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition
@property
def write_disposition(self):
"""See
:class:`~google.cloud.bigquery.job.CopyJobConfig.write_disposition`.
"""
return self._configuration.write_disposition

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
Expand All @@ -913,31 +957,27 @@ def _build_resource(self):
'tableId': table.table_id,
} for table in self.sources]

resource = {
configuration = self._configuration.to_api_repr()
configuration['sourceTables'] = source_refs
configuration['destinationTable'] = {
'projectId': self.destination.project,
'datasetId': self.destination.dataset_id,
'tableId': self.destination.table_id,
}

return {
'jobReference': {
'projectId': self.project,
'jobId': self.job_id,
},
'configuration': {
self._JOB_TYPE: {
'sourceTables': source_refs,
'destinationTable': {
'projectId': self.destination.project,
'datasetId': self.destination.dataset_id,
'tableId': self.destination.table_id,
},
},
self._JOB_TYPE: configuration,
},
}
configuration = resource['configuration'][self._JOB_TYPE]
self._populate_config_resource(configuration)

return resource

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.create_disposition = configuration.get('createDisposition')
self.write_disposition = configuration.get('writeDisposition')
self._configuration._properties = copy.deepcopy(configuration)

@classmethod
def from_api_repr(cls, resource, client):
Expand All @@ -958,27 +998,23 @@ def from_api_repr(cls, resource, client):
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: Job parsed from ``resource``.
"""
job_id, config = cls._get_resource_config(resource)
dest_config = config['destinationTable']
ds_ref = DatasetReference(dest_config['projectId'],
dest_config['datasetId'],)
dataset = Dataset(ds_ref)
table_ref = TableReference(dataset, dest_config['tableId'])
destination = Table(table_ref, client=client)
job_id, config_resource = cls._get_resource_config(resource)
config = CopyJobConfig.from_api_repr(config_resource)
destination = TableReference.from_api_repr(
config_resource['destinationTable'])
sources = []
source_configs = config.get('sourceTables')
source_configs = config_resource.get('sourceTables')
if source_configs is None:
single = config.get('sourceTable')
single = config_resource.get('sourceTable')
if single is None:
raise KeyError(
"Resource missing 'sourceTables' / 'sourceTable'")
source_configs = [single]
for source_config in source_configs:
ds_ref = DatasetReference(source_config['projectId'],
source_config['datasetId'])
table_ref = ds_ref.table(source_config['tableId'])
sources.append(Table(table_ref, client=client))
job = cls(job_id, destination, sources, client=client)
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
job = cls(
job_id, sources, destination, client=client, job_config=config)
job._set_properties(resource)
return job

Expand Down Expand Up @@ -1017,7 +1053,7 @@ def __init__(self):
"""

def to_api_repr(self):
"""Build an API representation of the extact job config.
"""Build an API representation of the extract job config.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
Expand All @@ -1033,7 +1069,7 @@ def from_api_repr(cls, resource):
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
:rtype: :class:`google.cloud.bigquery.job.CopyJobConfig`
:returns: Configuration parsed from ``resource``.
"""
config = cls()
Expand Down Expand Up @@ -1243,7 +1279,8 @@ def __init__(self, job_id, query, client,
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults
"""

create_disposition = CreateDisposition('create_disposition')
create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.createDisposition
"""
Expand Down Expand Up @@ -1289,7 +1326,8 @@ def __init__(self, job_id, query, client,
reference/rest/v2/jobs#configuration.dryRun
"""

write_disposition = WriteDisposition('write_disposition')
write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition
"""
Expand Down
Loading

0 comments on commit c322a5f

Please sign in to comment.