Skip to content

Commit

Permalink
fix(bigquery): preserve job config passed to Client methods (#9735)
Browse files Browse the repository at this point in the history
This commit assures that Client's methods that accept job config as
an argument operate on deep copies, and do not modify the original
job config instances passed to them.
  • Loading branch information
plamut authored and tswast committed Nov 12, 2019
1 parent 96baa3d commit e97771e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 8 deletions.
12 changes: 10 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(

self._connection = Connection(self, **kw_args)
self._location = location
self._default_query_job_config = default_query_job_config
self._default_query_job_config = copy.deepcopy(default_query_job_config)

@property
def location(self):
Expand Down Expand Up @@ -1381,6 +1381,7 @@ def load_table_from_uri(
destination = _table_arg_to_table_ref(destination, default_project=self.project)

if job_config:
job_config = copy.deepcopy(job_config)
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)

load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config)
Expand Down Expand Up @@ -1465,6 +1466,7 @@ def load_table_from_file(
destination = _table_arg_to_table_ref(destination, default_project=self.project)
job_ref = job._JobReference(job_id, project=project, location=location)
if job_config:
job_config = copy.deepcopy(job_config)
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
load_job = job.LoadJob(job_ref, None, destination, self, job_config)
job_resource = load_job.to_api_repr()
Expand Down Expand Up @@ -1969,6 +1971,8 @@ def copy_table(

if job_config:
_verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
job_config = copy.deepcopy(job_config)

copy_job = job.CopyJob(
job_ref, sources, destination, client=self, job_config=job_config
)
Expand Down Expand Up @@ -2049,6 +2053,8 @@ def extract_table(
_verify_job_config_type(
job_config, google.cloud.bigquery.job.ExtractJobConfig
)
job_config = copy.deepcopy(job_config)

extract_job = job.ExtractJob(
job_ref, source, destination_uris, client=self, job_config=job_config
)
Expand Down Expand Up @@ -2112,6 +2118,8 @@ def query(
if location is None:
location = self.location

job_config = copy.deepcopy(job_config)

if self._default_query_job_config:
if job_config:
_verify_job_config_type(
Expand All @@ -2129,7 +2137,7 @@ def query(
self._default_query_job_config,
google.cloud.bigquery.job.QueryJobConfig,
)
job_config = self._default_query_job_config
job_config = copy.deepcopy(self._default_query_job_config)

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
Expand Down
135 changes: 129 additions & 6 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,8 @@ def test_load_table_from_uri(self):
creds = _make_credentials()
http = object()
job_config = LoadJobConfig()
original_config_copy = copy.deepcopy(job_config)

client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection(RESOURCE)
destination = client.dataset(self.DS_ID).table(DESTINATION)
Expand All @@ -3010,6 +3012,9 @@ def test_load_table_from_uri(self):
method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE
)

# the original config object should not have been modified
self.assertEqual(job_config.to_api_repr(), original_config_copy.to_api_repr())

self.assertIsInstance(job, LoadJob)
self.assertIsInstance(job._configuration, LoadJobConfig)
self.assertIs(job._client, client)
Expand Down Expand Up @@ -3496,19 +3501,24 @@ def test_copy_table_w_valid_job_config(self):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
job_config = CopyJobConfig()
conn = client._connection = make_connection(RESOURCE)
dataset = client.dataset(self.DS_ID)
source = dataset.table(SOURCE)
destination = dataset.table(DESTINATION)

job_config = CopyJobConfig()
original_config_copy = copy.deepcopy(job_config)
job = client.copy_table(source, destination, job_id=JOB, job_config=job_config)

# Check that copy_table actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE
)
self.assertIsInstance(job._configuration, CopyJobConfig)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_extract_table(self):
from google.cloud.bigquery.job import ExtractJob

Expand Down Expand Up @@ -3679,6 +3689,7 @@ def test_extract_table_generated_job_id(self):
source = dataset.table(SOURCE)
job_config = ExtractJobConfig()
job_config.destination_format = DestinationFormat.NEWLINE_DELIMITED_JSON
original_config_copy = copy.deepcopy(job_config)

job = client.extract_table(source, DESTINATION, job_config=job_config)

Expand All @@ -3695,6 +3706,9 @@ def test_extract_table_generated_job_id(self):
self.assertEqual(job.source, source)
self.assertEqual(list(job.destination_uris), [DESTINATION])

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_extract_table_w_destination_uris(self):
from google.cloud.bigquery.job import ExtractJob

Expand Down Expand Up @@ -3840,6 +3854,7 @@ def test_query_w_explicit_job_config(self):
job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
original_config_copy = copy.deepcopy(job_config)

client.query(
query, job_id=job_id, location=self.LOCATION, job_config=job_config
Expand All @@ -3850,6 +3865,105 @@ def test_query_w_explicit_job_config(self):
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_query_preserving_explicit_job_config(self):
job_id = "some-job-id"
query = "select count(*) from persons"
resource = {
"jobReference": {
"jobId": job_id,
"projectId": self.PROJECT,
"location": self.LOCATION,
},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"useQueryCache": True,
"maximumBytesBilled": "2000",
}
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig

client = self._make_one(project=self.PROJECT, credentials=creds, _http=http,)
conn = client._connection = make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
original_config_copy = copy.deepcopy(job_config)

client.query(
query, job_id=job_id, location=self.LOCATION, job_config=job_config
)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_query_preserving_explicit_default_job_config(self):
job_id = "some-job-id"
query = "select count(*) from persons"
resource = {
"jobReference": {
"jobId": job_id,
"projectId": self.PROJECT,
"location": self.LOCATION,
},
"configuration": {
"query": {
"query": query,
"defaultDataset": {
"projectId": self.PROJECT,
"datasetId": "some-dataset",
},
"useLegacySql": False,
"maximumBytesBilled": "1000",
}
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference

default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, "some-dataset"
)
default_job_config.maximum_bytes_billed = 1000
default_config_copy = copy.deepcopy(default_job_config)

client = self._make_one(
project=self.PROJECT,
credentials=creds,
_http=http,
default_query_job_config=default_job_config,
)
conn = client._connection = make_connection(resource)

client.query(query, job_id=job_id, location=self.LOCATION, job_config=None)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original default config object should not have been modified
assert default_job_config.to_api_repr() == default_config_copy.to_api_repr()

def test_query_w_invalid_job_config(self):
from google.cloud.bigquery import QueryJobConfig, DatasetReference
from google.cloud.bigquery import job
Expand Down Expand Up @@ -5429,22 +5543,24 @@ def test_load_table_from_file_resumable(self):

client = self._make_client()
file_obj = self._make_file_obj()
job_config = self._make_config()
original_config_copy = copy.deepcopy(job_config)

do_upload_patch = self._make_do_upload_patch(
client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION
)
with do_upload_patch as do_upload:
client.load_table_from_file(
file_obj,
self.TABLE_REF,
job_id="job_id",
job_config=self._make_config(),
file_obj, self.TABLE_REF, job_id="job_id", job_config=job_config,
)

do_upload.assert_called_once_with(
file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_load_table_from_file_w_explicit_project(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES

Expand Down Expand Up @@ -5790,6 +5906,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
job_config = job.LoadJobConfig(
write_disposition=job.WriteDisposition.WRITE_TRUNCATE
)
original_config_copy = copy.deepcopy(job_config)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
Expand Down Expand Up @@ -5826,6 +5943,9 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_automatic_schema(self):
Expand Down Expand Up @@ -6466,6 +6586,7 @@ def test_load_table_from_json_non_default_args(self):
]
job_config = job.LoadJobConfig(schema=schema)
job_config._properties["load"]["unknown_field"] = "foobar"
original_config_copy = copy.deepcopy(job_config)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
Expand Down Expand Up @@ -6493,13 +6614,15 @@ def test_load_table_from_json_non_default_args(self):
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert job_config.source_format is None # the original was not modified
assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON
assert sent_config.schema == schema
assert not sent_config.autodetect
# all properties should have been cloned and sent to the backend
assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar"

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_load_table_from_json_w_invalid_job_config(self):
from google.cloud.bigquery import job

Expand Down

0 comments on commit e97771e

Please sign in to comment.