Skip to content

Commit

Permalink
adding test for cdc using dicts as input and cdc write callable
Browse files Browse the repository at this point in the history
  • Loading branch information
prodriguezdefino committed Sep 27, 2024
1 parent bd324c5 commit 3e5e31c
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 10 deletions.
48 changes: 48 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,54 @@ def test_write_with_beam_rows_cdc(self):
cdc_writes_primary_key=["name"]))
hamcrest_assert(p, bq_matcher)

def test_write_dicts_cdc(self):
table = 'write_dicts_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

expected_data_on_bq = [
# (name, value)
{
"name": "cdc_test",
"value": 5,
}
]

schema = {
"fields": [{
"name": "name", "type": "STRING"
}, {
"name": "value", "type": "INTEGER"
}]
}

dicts = [{
"name": "cdc_test", "value": 3
}, {
"name": "cdc_test", "value": 5
}, {
"name": "cdc_test", "value": 4
}]

bq_matcher = BigqueryFullResultMatcher(
project=self.project,
query="SELECT * FROM {}.{}".format(self.dataset_id, table),
data=self.parse_expected_data(expected_data_on_bq))

with beam.Pipeline(argv=self.args) as p:
_ = (
p
| beam.Create(dicts)
| beam.io.WriteToBigQuery(
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=schema,
use_at_least_once=True,
use_cdc_writes=lambda row: beam.Row(
mutation_type="UPSERT",
change_sequence_number="AAA/" + str(row.value)),
cdc_writes_primary_key=["name"]))
hamcrest_assert(p, bq_matcher)

def test_write_to_dynamic_destinations(self):
base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
spec_with_project = '{}:{}'.format(self.project, base_table_spec)
Expand Down
44 changes: 34 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,8 @@ class StorageWriteToBigQuery(PTransform):
RECORD = "record"
# field for rows sent to Storage API for CDC functionality
CDC_INFO = "cdc_info"
CDC_MUTATION_TYPE = "mutation_type"
CDC_SQN = "change_sequence_number"
# magic string to tell Java that these rows are going to dynamic destinations
DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"

Expand Down Expand Up @@ -2564,11 +2566,11 @@ def expand(self, input):
is_rows = True
except TypeError as exn:
raise ValueError(
"A schema is required in order to prepare rows"
"A schema is required in order to prepare rows "
"for writing with STORAGE_WRITE_API.") from exn
elif callable(self._schema):
raise NotImplementedError(
"Writing with dynamic schemas is not"
"Writing with dynamic schemas is not "
"supported for this write method.")
elif isinstance(self._schema, vp.ValueProvider):
schema = self._schema.get()
Expand Down Expand Up @@ -2628,31 +2630,53 @@ def expand(self, input):
# been passed to extract MutationInfo from the rows to be written
if callable(self._use_cdc_writes):
use_cdc_writes = True
type_hints_beam_rows = bigquery_tools.get_beam_typehints_from_tableschema(
schema)
cdc_info_fn = self._use_cdc_writes
# if we have dynamic destinations we just need to copy the
# destination and record properties while adding the cdc_info
if callable(table):
input_beam_rows = (
input_beam_rows
| "Include Row Mutation Info in the wrapping record" >> beam.Map(
| "Include CDC info in Row" >> beam.Map(
lambda row: beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: row[0],
StorageWriteToBigQuery.RECORD: row[1],
StorageWriteToBigQuery.CDC_INFO: self._use_cdc_writes(
row[1])
})))
StorageWriteToBigQuery.CDC_INFO: cdc_info_fn(row[1])
})).
with_output_types(
RowTypeConstraint.from_fields(
[(StorageWriteToBigQuery.DESTINATION, str),
(
StorageWriteToBigQuery.RECORD,
RowTypeConstraint.from_fields(type_hints_beam_rows)),
(
StorageWriteToBigQuery.CDC_INFO,
RowTypeConstraint.from_fields(
[(StorageWriteToBigQuery.CDC_MUTATION_TYPE, str),
(StorageWriteToBigQuery.CDC_SQN, str)]))])))
# otherwise, we create the wrapping Row with the record and
# cdc_info properties in it
else:
input_beam_rows = (
input_beam_rows
| "Create a wrapping Row including CDC info" >> beam.Map(
| "Wrap in Row with CDC info" >> beam.Map(
lambda row: beam.Row(
**{
StorageWriteToBigQuery.RECORD: row,
StorageWriteToBigQuery.CDC_INFO: self._use_cdc_writes(
row)
})))
StorageWriteToBigQuery.CDC_INFO: cdc_info_fn(row)
})).
with_output_types(
RowTypeConstraint.from_fields(
[(
StorageWriteToBigQuery.RECORD,
RowTypeConstraint.from_fields(type_hints_beam_rows)),
(
StorageWriteToBigQuery.CDC_INFO,
RowTypeConstraint.from_fields(
[(StorageWriteToBigQuery.CDC_MUTATION_TYPE, str),
(StorageWriteToBigQuery.CDC_SQN, str)]))])))
# otherwise we extract the configured boolean value
else:
use_cdc_writes = self._use_cdc_writes
Expand Down

0 comments on commit 3e5e31c

Please sign in to comment.