Skip to content

Commit

Permalink
bigquery: modify CopyJob
Browse files Browse the repository at this point in the history
Update CopyJob and CopyJobConfig to conform to the new design
for jobs.
  • Loading branch information
jba committed Sep 23, 2017
1 parent 08141bd commit 4077d04
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 106 deletions.
2 changes: 2 additions & 0 deletions bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.dataset import AccessEntry
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand All @@ -42,6 +43,7 @@
'ArrayQueryParameter',
'Client',
'Dataset',
'CopyJobConfig',
'ExtractJobConfig',
'ScalarQueryParameter',
'SchemaField',
Expand Down
56 changes: 44 additions & 12 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import Table, TableReference
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import LoadJob
Expand Down Expand Up @@ -492,25 +492,45 @@ def load_table_from_storage(self, job_id, destination, *source_uris):
"""
return LoadJob(job_id, destination, source_uris, client=self)

def copy_table(self, job_id, destination, *sources):
"""Construct a job for copying one or more tables into another table.
def copy_table(self, sources, destination, **kwargs):
"""Start a job for copying one or more tables into another table.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
:type job_id: str
:param job_id: Name of the job.
:type sources: One of:
:class:`~google.cloud.bigquery.table.TableReference`
sequence of
:class:`~google.cloud.bigquery.table.TableReference`
:param sources: Table or tables to be copied.
:type destination: :class:`google.cloud.bigquery.table.Table`
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be copied.
:type sources: sequence of :class:`google.cloud.bigquery.table.Table`
:param sources: tables to be copied.
:type kwargs: dict
:param kwargs: Additional keyword arguments.
:Keyword Arguments:
* *job_config*
(:class:`google.cloud.bigquery.job.CopyJobConfig`) --
(Optional) Extra configuration options for the copy job.
* *job_id* (``str``) --
Additional content
(Optional) The ID of the job.
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: a new ``CopyJob`` instance
"""
return CopyJob(job_id, destination, sources, client=self)
job_config = kwargs.get('job_config')
job_id = _make_job_id(kwargs.get('job_id'))

if isinstance(sources, TableReference):
sources = [sources]
job = CopyJob(job_id, sources, destination, client=self,
job_config=job_config)
job.begin()
return job

def extract_table(self, source, *destination_uris, **kwargs):
"""Start a job to extract a table into Cloud Storage files.
Expand Down Expand Up @@ -541,9 +561,7 @@ def extract_table(self, source, *destination_uris, **kwargs):
:returns: a new ``ExtractJob`` instance
"""
job_config = kwargs.get('job_config')
job_id = kwargs.get('job_id')
if job_id is None:
job_id = str(uuid.uuid4())
job_id = _make_job_id(kwargs.get('job_id'))

job = ExtractJob(
job_id, source, list(destination_uris), client=self,
Expand Down Expand Up @@ -667,3 +685,17 @@ def _item_to_table(iterator, resource):
:returns: The next table in the page.
"""
return Table.from_api_repr(resource, iterator.client)


def _make_job_id(job_id):
"""Construct an ID for a new job.
:type job_id: str or ``NoneType``
:param job_id: the user-provided job ID
:rtype: str
:returns: A job ID
"""
if job_id is None:
return str(uuid.uuid4())
return job_id
168 changes: 103 additions & 65 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Compression(_EnumApiResourceProperty):
NONE = 'NONE'


class CreateDisposition(_EnumProperty):
class CreateDisposition(_EnumApiResourceProperty):
"""Pseudo-enum for ``create_disposition`` properties."""
CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
CREATE_NEVER = 'CREATE_NEVER'
Expand Down Expand Up @@ -159,7 +159,7 @@ class SourceFormat(_EnumProperty):
AVRO = 'AVRO'


class WriteDisposition(_EnumProperty):
class WriteDisposition(_EnumApiResourceProperty):
"""Pseudo-enum for ``write_disposition`` properties."""
WRITE_APPEND = 'WRITE_APPEND'
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
Expand Down Expand Up @@ -688,7 +688,8 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
"""

create_disposition = CreateDisposition('create_disposition')
create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
"""
Expand Down Expand Up @@ -733,7 +734,8 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat
"""

write_disposition = WriteDisposition('write_disposition')
write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition
"""
Expand Down Expand Up @@ -853,56 +855,98 @@ def from_api_repr(cls, resource, client):
return job


class _CopyConfiguration(object):
"""User-settable configuration options for copy jobs.
class CopyJobConfig(object):
"""Configuration options for copy jobs.
Values which are ``None`` -> server defaults.
All properties in this class are optional. Values which are ``None`` ->
server defaults.
"""
_create_disposition = None
_write_disposition = None

def __init__(self):
self._properties = {}

create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
"""

write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
"""

def to_api_repr(self):
"""Build an API representation of the copy job config.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
"""
return copy.deepcopy(self._properties)

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
:type resource: dict
:param resource:
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
:returns: Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config


class CopyJob(_AsyncJob):
"""Asynchronous job: copy data into a table from other tables.
"""Asynchr_onous job: copy data into a table from other tables.
:type job_id: str
:param job_id: the job's ID, within the project belonging to ``client``.
:type destination: :class:`google.cloud.bigquery.table.Table`
:param destination: Table into which data is to be loaded.
:type sources: list of :class:`google.cloud.bigquery.table.Table`
:type sources: list of :class:`google.cloud.bigquery.table.TableReference`
:param sources: Table into which data is to be loaded.
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be loaded.
:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
:type job_config: :class:`~google.cloud.bigquery.job.CopyJobConfig`
:param job_config:
(Optional) Extra configuration options for the copy job.
"""
_JOB_TYPE = 'copy'

def __init__(self, job_id, destination, sources, client):
def __init__(self, job_id, sources, destination, client, job_config=None):
super(CopyJob, self).__init__(job_id, client)

if job_config is None:
job_config = CopyJobConfig()

self.destination = destination
self.sources = sources
self._configuration = _CopyConfiguration()

create_disposition = CreateDisposition('create_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition
"""
self._configuration = job_config

write_disposition = WriteDisposition('write_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition
"""
@property
def create_disposition(self):
"""See
:class:`~google.cloud.bigquery.job.CopyJobConfig.create_disposition`.
"""
return self._configuration.create_disposition

def _populate_config_resource(self, configuration):
"""Helper for _build_resource: copy config properties to resource"""
if self.create_disposition is not None:
configuration['createDisposition'] = self.create_disposition
if self.write_disposition is not None:
configuration['writeDisposition'] = self.write_disposition
@property
def write_disposition(self):
"""See
:class:`~google.cloud.bigquery.job.CopyJobConfig.write_disposition`.
"""
return self._configuration.write_disposition

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
Expand All @@ -913,31 +957,27 @@ def _build_resource(self):
'tableId': table.table_id,
} for table in self.sources]

resource = {
configuration = self._configuration.to_api_repr()
configuration['sourceTables'] = source_refs
configuration['destinationTable'] = {
'projectId': self.destination.project,
'datasetId': self.destination.dataset_id,
'tableId': self.destination.table_id,
}

return {
'jobReference': {
'projectId': self.project,
'jobId': self.job_id,
},
'configuration': {
self._JOB_TYPE: {
'sourceTables': source_refs,
'destinationTable': {
'projectId': self.destination.project,
'datasetId': self.destination.dataset_id,
'tableId': self.destination.table_id,
},
},
self._JOB_TYPE: configuration,
},
}
configuration = resource['configuration'][self._JOB_TYPE]
self._populate_config_resource(configuration)

return resource

def _copy_configuration_properties(self, configuration):
"""Helper: assign subclass configuration properties in cleaned."""
self.create_disposition = configuration.get('createDisposition')
self.write_disposition = configuration.get('writeDisposition')
self._configuration._properties = copy.deepcopy(configuration)

@classmethod
def from_api_repr(cls, resource, client):
Expand All @@ -958,27 +998,23 @@ def from_api_repr(cls, resource, client):
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: Job parsed from ``resource``.
"""
job_id, config = cls._get_resource_config(resource)
dest_config = config['destinationTable']
ds_ref = DatasetReference(dest_config['projectId'],
dest_config['datasetId'],)
dataset = Dataset(ds_ref)
table_ref = TableReference(dataset, dest_config['tableId'])
destination = Table(table_ref, client=client)
job_id, config_resource = cls._get_resource_config(resource)
config = CopyJobConfig.from_api_repr(config_resource)
destination = TableReference.from_api_repr(
config_resource['destinationTable'])
sources = []
source_configs = config.get('sourceTables')
source_configs = config_resource.get('sourceTables')
if source_configs is None:
single = config.get('sourceTable')
single = config_resource.get('sourceTable')
if single is None:
raise KeyError(
"Resource missing 'sourceTables' / 'sourceTable'")
source_configs = [single]
for source_config in source_configs:
ds_ref = DatasetReference(source_config['projectId'],
source_config['datasetId'])
table_ref = ds_ref.table(source_config['tableId'])
sources.append(Table(table_ref, client=client))
job = cls(job_id, destination, sources, client=client)
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
job = cls(
job_id, sources, destination, client=client, job_config=config)
job._set_properties(resource)
return job

Expand Down Expand Up @@ -1017,7 +1053,7 @@ def __init__(self):
"""

def to_api_repr(self):
"""Build an API representation of the extact job config.
"""Build an API representation of the extract job config.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
Expand All @@ -1033,7 +1069,7 @@ def from_api_repr(cls, resource):
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig`
:rtype: :class:`google.cloud.bigquery.job.CopyJobConfig`
:returns: Configuration parsed from ``resource``.
"""
config = cls()
Expand Down Expand Up @@ -1243,7 +1279,8 @@ def __init__(self, job_id, query, client,
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults
"""

create_disposition = CreateDisposition('create_disposition')
create_disposition = CreateDisposition('create_disposition',
'createDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.createDisposition
"""
Expand Down Expand Up @@ -1289,7 +1326,8 @@ def __init__(self, job_id, query, client,
reference/rest/v2/jobs#configuration.dryRun
"""

write_disposition = WriteDisposition('write_disposition')
write_disposition = WriteDisposition('write_disposition',
'writeDisposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition
"""
Expand Down
Loading

0 comments on commit 4077d04

Please sign in to comment.