Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable BigQuery CDC configuration for Python BigQuery sink #32529

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
745cd58
include CDC configuration on the storage write transform provider
prodriguezdefino Sep 20, 2024
504d2a4
adding the primary key configuration for CDC and tests
prodriguezdefino Sep 20, 2024
a2eabf9
fixing List.of references to use ImmutableList
prodriguezdefino Sep 20, 2024
01703c1
fixing test, missing calling the cdc info row builder() method
prodriguezdefino Sep 20, 2024
51360d5
fix test, add config validations
prodriguezdefino Sep 21, 2024
542a49e
added the xlang params to storage write python wrapper
prodriguezdefino Sep 21, 2024
fe36fe8
adding missing comma
prodriguezdefino Sep 21, 2024
1004f91
shortening property name
prodriguezdefino Sep 21, 2024
de9e948
changing xlang config property
prodriguezdefino Sep 21, 2024
ab40dd9
set use cdc schema property as nullable, added safe retrieval method
prodriguezdefino Sep 21, 2024
27e5634
fixes property name reference and argument type definition
prodriguezdefino Sep 22, 2024
e804618
python format fix
prodriguezdefino Sep 22, 2024
7b7255b
adding xlang IT with BQ
prodriguezdefino Sep 22, 2024
2b901eb
adding missing primary key column to test
prodriguezdefino Sep 22, 2024
bb16979
python format fix
prodriguezdefino Sep 22, 2024
12dce1b
format xlang test
prodriguezdefino Sep 22, 2024
27a86db
more format xlang test fixes
prodriguezdefino Sep 22, 2024
e5766e7
and more format xlang test fixes
prodriguezdefino Sep 22, 2024
09f298a
adding missing import
prodriguezdefino Sep 22, 2024
e972c89
missing self reference
prodriguezdefino Sep 22, 2024
ec3373b
enabled create if needed functionality for CDC python integration, im…
prodriguezdefino Sep 24, 2024
23ea4c3
Update bigquery.py
prodriguezdefino Sep 24, 2024
143566a
triggering the xlang tests
prodriguezdefino Sep 24, 2024
4e390c7
fixing lint
prodriguezdefino Sep 24, 2024
51bae40
addressing few comments
prodriguezdefino Sep 26, 2024
6331691
cdc info is added after row transformation now
prodriguezdefino Sep 27, 2024
56fe2b6
remove not used param
prodriguezdefino Sep 27, 2024
ce85c88
Merge branch 'master' into python_xlang_cdc_bigquery
prodriguezdefino Sep 27, 2024
bd324c5
removed typing information for callable
prodriguezdefino Sep 27, 2024
3e5e31c
adding test for cdc using dicts as input and cdc write callable
prodriguezdefino Sep 27, 2024
0073348
simplifying the xlang configuration from python perspective, will add…
prodriguezdefino Oct 2, 2024
9dd4cfa
spotless apply
prodriguezdefino Oct 2, 2024
4208517
wrong property passed to xlang builder
prodriguezdefino Oct 2, 2024
cd7b6cd
missing self
prodriguezdefino Oct 2, 2024
72d1bf4
fixing xlang it
prodriguezdefino Oct 2, 2024
1487acc
fixes wrong property reference
prodriguezdefino Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,7 @@ public static Write<RowMutation> applyRowMutations() {
.withFormatFunction(RowMutation::getTableRow)
.withRowMutationInformationFn(RowMutation::getMutationInformation);
}

/**
* A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord
* GenericRecords} to a BigQuery table.
Expand Down Expand Up @@ -2384,8 +2385,10 @@ public enum Method {
abstract WriteDisposition getWriteDisposition();

abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();

/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();

/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();

Expand Down Expand Up @@ -3472,7 +3475,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getRowMutationInformationFn() != null) {
checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
checkArgument(
getMethod() == Method.STORAGE_API_AT_LEAST_ONCE,
"When using row updates on BigQuery, StorageWrite API should execute using"
+ " \"at least once\" mode.");
checkArgument(
getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null,
"If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand All @@ -37,6 +39,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
Expand Down Expand Up @@ -87,6 +90,14 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
protected static final Schema ROW_SCHEMA_MUTATION_INFO =
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build();

@Override
protected SchemaTransform from(
Expand Down Expand Up @@ -257,6 +268,20 @@ public static Builder builder() {
@Nullable
public abstract ErrorHandling getErrorHandling();

@SchemaFieldDescription(
"This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
+ "change_sequence_number:\"...\"}, record: {...}}")
@Nullable
public abstract Boolean getUseCdcWrites();

@SchemaFieldDescription(
"If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
+ " primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
@Nullable
public abstract List<String> getPrimaryKey();

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -277,6 +302,10 @@ public abstract static class Builder {

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract Builder setUseCdcWrites(Boolean cdcWrites);

public abstract Builder setPrimaryKey(List<String> pkColumns);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
.BigQueryStorageWriteApiSchemaTransformConfiguration
Expand Down Expand Up @@ -365,6 +394,34 @@ public TableSchema getSchema(String destination) {
}
}

private static class CdcWritesDynamicDestination extends RowDynamicDestinations {
final String fixedDestination;
final List<String> primaryKey;

public CdcWritesDynamicDestination(
Schema schema, String fixedDestination, List<String> primaryKey) {
super(schema);
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return Optional.ofNullable(fixedDestination).orElseGet(() -> super.getDestination(element));
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
Comment on lines +413 to +421
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that BigQueryIO already attaches TableConstraints to the input destinations object:

if (getPrimaryKey() != null) {
dynamicDestinations =
new DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
new TableConstraints()
.setPrimaryKey(
new TableConstraints.PrimaryKey().setColumns(getPrimaryKey())));
}

To avoid duplicate logic, can we rely on the above and do away with the changes to RowDynamicDestinations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PTrasform instantiated by the provider does not execute this code. That’s the reason why I modified the row dynamic destination. I thought also on using a delegating dynamic destination but sadly that type has package limited access so I had only three options: introduce a larger change in the provider to directly use the BQIO apply, change visibility for the delegating dynamic destination, use inheritance for the row dyndest or modifying it. I thought the implemented option is the less disruptive from framework perspective.

Copy link
Contributor Author

@prodriguezdefino prodriguezdefino Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmedabu98 I kept the modifications on the Row Dynamic Destinations given the access constraints on the delegating DynDest and that the shared check only applies to the case when a dynamic destination is not added into the BQIO.

Let me know if you want me to make more extensive changes to include the expand logic for BQIO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PTrasform instantiated by the provider does not execute this code

Ahh the code block is executed only when dynamic destinations == null. Agreed a lil unfortunate but that's okay -- passing the primary key to RowDynamicDestinations and implementing getTableConstraints() should be enough

}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
Expand Down Expand Up @@ -453,6 +510,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -466,11 +530,11 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().equals(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just instantiate new RowDynamicDestinations(<row schema>, <primary key>) here? And avoid instantiating it again in validateAndIncludeCDCInformation?

Expand All @@ -485,6 +549,7 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}

if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
Expand All @@ -498,5 +563,52 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {

return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");
checkArgument(
schema
.getField(ROW_PROPERTY_MUTATION_INFO)
.getType()
.getRowSchema()
.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with fields \""
+ ROW_PROPERTY_MUTATION_TYPE
+ "\" and \""
+ ROW_PROPERTY_MUTATION_SQN
+ "\" both of type string.");

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(
new CdcWritesDynamicDestination(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.beam.sdk.io.gcp.testing;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableRow;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
Expand Down Expand Up @@ -51,12 +53,24 @@ class TableContainer {
this.keyedRows = Maps.newHashMap();
this.ids = new ArrayList<>();
this.sizeBytes = 0L;
// extract primary key information from Table if present
List<String> pkColumns = primaryKeyColumns(table);
this.primaryKeyColumns = pkColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(pkColumns, table);
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
static @Nullable List<String> primaryKeyColumns(Table table) {
return Optional.ofNullable(table.getTableConstraints())
.flatMap(constraints -> Optional.ofNullable(constraints.getPrimaryKey()))
.map(TableConstraints.PrimaryKey::getColumns)
.orElse(null);
}

static @Nullable List<Integer> primaryColumnFieldIndices(
@Nullable List<String> primaryKeyColumns, Table table) {
if (primaryKeyColumns == null) {
return null;
}
Map<String, Integer> indices =
IntStream.range(0, table.getSchema().getFields().size())
.boxed()
Expand All @@ -65,7 +79,13 @@ void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
for (String columnName : primaryKeyColumns) {
primaryKeyColumnIndices.add(Preconditions.checkStateNotNull(indices.get(columnName)));
}
this.primaryKeyColumnIndices = primaryKeyColumnIndices;
return primaryKeyColumnIndices;
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(primaryKeyColumns, table);
}

@Nullable
Expand All @@ -80,7 +100,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {
.stream()
.map(cell -> Preconditions.checkStateNotNull(cell.get("v")))
.collect(Collectors.toList());
;

return Preconditions.checkStateNotNull(primaryKeyColumnIndices).stream()
.map(cellValues::get)
.collect(Collectors.toList());
Expand All @@ -91,7 +111,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {

long addRow(TableRow row, String id) {
List<Object> primaryKey = getPrimaryKey(row);
if (primaryKey != null) {
if (primaryKey != null && !primaryKey.isEmpty()) {
if (keyedRows.putIfAbsent(primaryKey, row) != null) {
throw new RuntimeException(
"Primary key validation error! Multiple inserts with the same primary key.");
Expand Down
Loading
Loading