Skip to content

Commit

Permalink
simplifying the xlang configuration from python perspective, will add…
Browse files Browse the repository at this point in the history
… callable on a future PR
  • Loading branch information
prodriguezdefino committed Oct 2, 2024
1 parent 3e5e31c commit 0073348
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
protected static final Schema ROW_SCHEMA_MUTATION_INFO =
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build();

@Override
protected SchemaTransform from(
Expand Down Expand Up @@ -264,16 +272,16 @@ public static Builder builder() {
@SchemaFieldDescription(
"This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ " adding the CDC info similar to: {cdc_info: {mutation_type:\"...\", "
+ " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
+ "change_sequence_number:\"...\"}, record: {...}}")
@Nullable
public abstract Boolean getUseCdcWrites();

@SchemaFieldDescription(
"In the case of using CDC writes and setting CREATE_IF_NEEDED mode for the tables"
+ " a primary key is required.")
"If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
+ " primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
@Nullable
public abstract List<String> getCdcWritesPrimaryKey();
public abstract List<String> getPrimaryKey();

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
Expand All @@ -297,7 +305,7 @@ public abstract static class Builder {

public abstract Builder setUseCdcWrites(Boolean cdcWrites);

public abstract Builder setCdcWritesPrimaryKey(List<String> pkColumns);
public abstract Builder setPrimaryKey(List<String> pkColumns);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
Expand Down Expand Up @@ -366,28 +374,41 @@ public void process(ProcessContext c) {}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
Schema schema;
String fixedDestination = null;
List<String> primaryKey = null;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
}

RowDynamicDestinations withFixedDestination(String destination) {
this.fixedDestination = destination;
return this;
@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return element.getValue().getString("destination");
}

RowDynamicDestinations withPrimaryKey(List<String> primaryKey) {
@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}

@Override
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}
}

private static class CdcWritesDynamicDestination extends RowDynamicDestinations {
final String fixedDestination;
final List<String> primaryKey;

public CdcWritesDynamicDestination(
Schema schema, String fixedDestination, List<String> primaryKey) {
super(schema);
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
return this;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return fixedDestination != null
? fixedDestination
: element.getValue().getString("destination");
return Optional.ofNullable(fixedDestination).orElseGet(() -> super.getDestination(element));
}

@Override
Expand All @@ -400,16 +421,6 @@ public TableConstraints getTableConstraints(String destination) {
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
}

@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}

@Override
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}
}

@Override
Expand Down Expand Up @@ -500,6 +511,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -513,17 +531,15 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));
} else if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else {
write = write.to(configuration.getTable()).useBeamSchema();
}
Expand All @@ -534,6 +550,7 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}

if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
Expand All @@ -550,57 +567,49 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
checkArgument(
BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS
.get(configuration.getCreateDisposition().toUpperCase())
.equals(CreateDisposition.CREATE_IF_NEEDED)
&& !Optional.ofNullable(configuration.getCdcWritesPrimaryKey())
.orElse(ImmutableList.of())
.isEmpty(),
"When using CDC writes into BigQuery, alongside with CREATE_IF_NEEDED mode,"
+ " a primary key should be provided.");
}
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().contains("destination"),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field.");
}
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("cdc_info", "record")),
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\"cdc_info\" Row field and a \"record\" Row field.");
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");
checkArgument(
schema
.getField("cdc_info")
.getField(ROW_PROPERTY_MUTATION_INFO)
.getType()
.getRowSchema()
.equals(
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build()),
"When writing using CDC functionality, we expect a \"cdc_info\" field of Row type "
+ "with fields \"mutation_type\" and \"change_sequence_number\" of type string.");

RowDynamicDestinations destinations =
new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())
.withPrimaryKey(configuration.getCdcWritesPrimaryKey());
if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
destinations = destinations.withFixedDestination(configuration.getTable());
.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with fields \""
+ ROW_PROPERTY_MUTATION_TYPE
+ "\" and \""
+ ROW_PROPERTY_MUTATION_SQN
+ "\" both of type string.");

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(destinations)
.to(
new CdcWritesDynamicDestination(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getCdcWritesPrimaryKey())
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow("cdc_info").getString("mutation_type")),
row.getRow("cdc_info").getString("change_sequence_number")));
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ public void testWriteToDynamicDestinations() throws Exception {
}

List<Row> createCDCUpsertRows(List<Row> rows, boolean dynamicDestination, String tablePrefix) {
Schema cdcInfoSchema =
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build();

Schema.Builder schemaBuilder =
Schema.builder().addRowField("record", SCHEMA).addRowField("cdc_info", cdcInfoSchema);
Schema.builder()
.addRowField("record", SCHEMA)
.addRowField(
BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO,
BigQueryStorageWriteApiSchemaTransformProvider.ROW_SCHEMA_MUTATION_INFO);

if (dynamicDestination) {
schemaBuilder = schemaBuilder.addStringField("destination");
Expand All @@ -245,10 +245,18 @@ List<Row> createCDCUpsertRows(List<Row> rows, boolean dynamicDestination, String
Row.FieldValueBuilder rowBuilder =
Row.withSchema(schemaWithCDC)
.withFieldValue(
"cdc_info",
Row.withSchema(cdcInfoSchema)
.withFieldValue("mutation_type", "UPSERT")
.withFieldValue("change_sequence_number", "AAA" + idx)
BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO,
Row.withSchema(
BigQueryStorageWriteApiSchemaTransformProvider
.ROW_SCHEMA_MUTATION_INFO)
.withFieldValue(
BigQueryStorageWriteApiSchemaTransformProvider
.ROW_PROPERTY_MUTATION_TYPE,
"UPSERT")
.withFieldValue(
BigQueryStorageWriteApiSchemaTransformProvider
.ROW_PROPERTY_MUTATION_SQN,
"AAA" + idx)
.build())
.withFieldValue("record", row);
if (dynamicDestination) {
Expand All @@ -270,7 +278,7 @@ public void testCDCWrites() throws Exception {
.setUseAtLeastOnceSemantics(true)
.setTable(tableSpec)
.setUseCdcWrites(true)
.setCdcWritesPrimaryKey(primaryKeyColumns)
.setPrimaryKey(primaryKeyColumns)
.build();

List<Row> rowsDuplicated =
Expand Down Expand Up @@ -299,9 +307,10 @@ public void testCDCWriteToDynamicDestinations() throws Exception {
String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
BigQueryStorageWriteApiSchemaTransformConfiguration config =
BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
.setUseAtLeastOnceSemantics(true)
.setTable(dynamic)
.setUseCdcWrites(true)
.setCdcWritesPrimaryKey(primaryKeyColumns)
.setPrimaryKey(primaryKeyColumns)
.build();

String baseTableSpec = "project:dataset.cdc_dynamic_write_";
Expand Down
33 changes: 0 additions & 33 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,39 +284,6 @@ def test_write_with_beam_rows_cdc(self):
cdc_writes_primary_key=["name"]))
hamcrest_assert(p, bq_matcher)

def test_write_dicts_cdc(self):
table = 'write_dicts_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

expected_data_on_bq = [
# (name, value)
{
"name": "cdc_test",
"value": 5,
}
]

schema = {
"fields": [{
"name": "name", "type": "STRING"
}, {
"name": "value", "type": "INTEGER"
}]
}

dicts = [{
"name": "cdc_test", "value": 3
}, {
"name": "cdc_test", "value": 5
}, {
"name": "cdc_test", "value": 4
}]

bq_matcher = BigqueryFullResultMatcher(
project=self.project,
query="SELECT * FROM {}.{}".format(self.dataset_id, table),
data=self.parse_expected_data(expected_data_on_bq))

with beam.Pipeline(argv=self.args) as p:
_ = (
p
Expand Down
Loading

0 comments on commit 0073348

Please sign in to comment.