Skip to content

Commit

Permalink
Fix failing python BQ test (#30099)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Jan 25, 2024
1 parent 772cf0a commit 3ae8518
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
Empty file.
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ def __init__(
kms_key=None,
batch_size=None,
max_file_size=None,
max_partition_size=None,
max_files_per_bundle=None,
test_client=None,
custom_gcs_temp_location=None,
Expand Down Expand Up @@ -1934,6 +1935,8 @@ def __init__(
max_file_size (int): The maximum size for a file to be written and then
loaded into BigQuery. The default value is 4TB, which is 80% of the
limit of 5TB for BigQuery to load any file.
max_partition_size (int): Maximum byte size for each load job to
BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
max_files_per_bundle(int): The maximum number of files to be concurrently
written by a worker. The default here is 20. Larger values will allow
writing to multiple destinations without having to reshard - but they
Expand Down Expand Up @@ -2059,6 +2062,7 @@ def __init__(
# TODO(pabloem): Consider handling ValueProvider for this location.
self.custom_gcs_temp_location = custom_gcs_temp_location
self.max_file_size = max_file_size
self.max_partition_size = max_partition_size
self.max_files_per_bundle = max_files_per_bundle
self.method = method or WriteToBigQuery.Method.DEFAULT
self.triggering_frequency = triggering_frequency
Expand Down Expand Up @@ -2202,6 +2206,7 @@ def find_in_nested_dict(schema):
with_auto_sharding=self.with_auto_sharding,
temp_file_format=self._temp_file_format,
max_file_size=self.max_file_size,
max_partition_size=self.max_partition_size,
max_files_per_bundle=self.max_files_per_bundle,
custom_gcs_temp_location=self.custom_gcs_temp_location,
test_client=self.test_client,
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ def process(
)
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)

job_reference = self.bq_wrapper.perform_load_job(
destination=table_reference,
source_uris=files,
Expand Down
15 changes: 7 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,9 +925,6 @@ def test_batch_copy_jobs_with_no_input_schema(self):
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND')

# reduce load job size to induce copy jobs
bqfl._DEFAULT_MAX_FILE_SIZE = 10
bqfl._MAXIMUM_LOAD_SIZE = 20
verifiers = [
BigqueryFullResultMatcher(
project=self.project,
Expand All @@ -949,8 +946,7 @@ def callable_table(el: dict):
dest += "_2"
return dest

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=all_of(verifiers))
args = self.test_pipeline.get_full_options_as_args()

with beam.Pipeline(argv=args) as p:
# 0...4 going to table 1
Expand All @@ -961,7 +957,10 @@ def callable_table(el: dict):
p | beam.Create(items) | bigquery.WriteToBigQuery(
table=callable_table,
create_disposition="CREATE_NEVER",
write_disposition="WRITE_APPEND"))
write_disposition="WRITE_APPEND",
# reduce load job size to induce copy jobs
max_file_size=10,
max_partition_size=20))

hamcrest_assert(p, all_of(*verifiers))

Expand Down Expand Up @@ -1001,8 +1000,7 @@ def test_multiple_destinations_transform(self):
if 'foundation' in d])
]

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=all_of(*pipeline_verifiers))
args = self.test_pipeline.get_full_options_as_args()

with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS, reshuffle=False)
Expand Down Expand Up @@ -1044,6 +1042,7 @@ def test_multiple_destinations_transform(self):
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
max_file_size=20,
max_files_per_bundle=-1))
hamcrest_assert(p, all_of(*pipeline_verifiers))

@pytest.mark.it_postcommit
def test_bqfl_streaming(self):
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/test-suites/dataflow/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ task postCommitIT {
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
"collect": "it_postcommit"
"collect": "it_postcommit",
]
def cmdArgs = mapToArgString(argMap)
exec {
Expand Down

0 comments on commit 3ae8518

Please sign in to comment.