Skip to content

Commit

Permalink
bigquery: modify CopyJob
Browse files Browse the repository at this point in the history
Update CopyJob and CopyJobConfig to conform to the new design
for jobs.
  • Loading branch information
jba committed Sep 25, 2017
1 parent 8a65026 commit f6db150
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 127 deletions.
2 changes: 2 additions & 0 deletions bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.dataset import AccessEntry
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand All @@ -42,6 +43,7 @@
'ArrayQueryParameter',
'Client',
'Dataset',
'CopyJobConfig',
'ExtractJobConfig',
'ScalarQueryParameter',
'SchemaField',
Expand Down
24 changes: 0 additions & 24 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,6 @@ def __init__(self, name, type_, value):
self.type_ = type_
self.value = value

def __eq__(self, other):
if not isinstance(other, ScalarQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.type_ == other.type_ and
self.value == other.value)

@classmethod
def positional(cls, type_, value):
"""Factory for positional paramater.
Expand Down Expand Up @@ -637,14 +629,6 @@ def __init__(self, name, array_type, values):
self.array_type = array_type
self.values = values

def __eq__(self, other):
if not isinstance(other, ArrayQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.array_type == other.array_type and
self.values == other.values)

@classmethod
def positional(cls, array_type, values):
"""Factory for positional parameters.
Expand Down Expand Up @@ -789,14 +773,6 @@ def __init__(self, name, *sub_params):
types[sub.name] = sub.type_
values[sub.name] = sub.value

def __eq__(self, other):
if not isinstance(other, StructQueryParameter):
return NotImplemented
return(
self.name == other.name and
self.struct_types == other.struct_types and
self.struct_values == other.struct_values)

@classmethod
def positional(cls, *sub_params):
"""Factory for positional parameters.
Expand Down
49 changes: 38 additions & 11 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import collections
import uuid

from google.api.core import page_iterator
Expand Down Expand Up @@ -492,25 +493,39 @@ def load_table_from_storage(self, job_id, destination, *source_uris):
"""
return LoadJob(job_id, destination, source_uris, client=self)

def copy_table(self, job_id, destination, *sources):
"""Construct a job for copying one or more tables into another table.
def copy_table(self, sources, destination, job_id=None, job_config=None):
"""Start a job for copying one or more tables into another table.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy
:type job_id: str
:param job_id: Name of the job.
:type sources: One of:
:class:`~google.cloud.bigquery.table.TableReference`
sequence of
:class:`~google.cloud.bigquery.table.TableReference`
:param sources: Table or tables to be copied.
:type destination: :class:`google.cloud.bigquery.table.Table`
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be copied.
:type sources: sequence of :class:`google.cloud.bigquery.table.Table`
:param sources: tables to be copied.
:type job_id: str
:param job_id: (Optional) The ID of the job.
:type job_config: :class:`google.cloud.bigquery.job.CopyJobConfig`
:param job_config: (Optional) Extra configuration options for the job.
:rtype: :class:`google.cloud.bigquery.job.CopyJob`
:returns: a new ``CopyJob`` instance
"""
return CopyJob(job_id, destination, sources, client=self)
job_id = _make_job_id(job_id)

if not isinstance(sources, collections.Sequence):
sources = [sources]
job = CopyJob(job_id, sources, destination, client=self,
job_config=job_config)
job.begin()
return job

def extract_table(self, source, *destination_uris, **kwargs):
"""Start a job to extract a table into Cloud Storage files.
Expand Down Expand Up @@ -541,9 +556,7 @@ def extract_table(self, source, *destination_uris, **kwargs):
:returns: a new ``ExtractJob`` instance
"""
job_config = kwargs.get('job_config')
job_id = kwargs.get('job_id')
if job_id is None:
job_id = str(uuid.uuid4())
job_id = _make_job_id(kwargs.get('job_id'))

job = ExtractJob(
job_id, source, list(destination_uris), client=self,
Expand Down Expand Up @@ -667,3 +680,17 @@ def _item_to_table(iterator, resource):
:returns: The next table in the page.
"""
return Table.from_api_repr(resource, iterator.client)


def _make_job_id(job_id):
"""Construct an ID for a new job.
:type job_id: str or ``NoneType``
:param job_id: the user-provided job ID
:rtype: str
:returns: A job ID
"""
if job_id is None:
return str(uuid.uuid4())
return job_id
Loading

0 comments on commit f6db150

Please sign in to comment.