Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable BigQuery CDC configuration for Python BigQuery sink #32529

Merged
merged 40 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
745cd58
include CDC configuration on the storage write transform provider
prodriguezdefino Sep 20, 2024
504d2a4
adding the primary key configuration for CDC and tests
prodriguezdefino Sep 20, 2024
a2eabf9
fixing List.of references to use ImmutableList
prodriguezdefino Sep 20, 2024
01703c1
fixing test, missing calling the cdc info row builder() method
prodriguezdefino Sep 20, 2024
51360d5
fix test, add config validations
prodriguezdefino Sep 21, 2024
542a49e
added the xlang params to storage write python wrapper
prodriguezdefino Sep 21, 2024
fe36fe8
adding missing comma
prodriguezdefino Sep 21, 2024
1004f91
shortening property name
prodriguezdefino Sep 21, 2024
de9e948
changing xlang config property
prodriguezdefino Sep 21, 2024
ab40dd9
set use cdc schema property as nullable, added safe retrieval method
prodriguezdefino Sep 21, 2024
27e5634
fixes property name reference and argument type definition
prodriguezdefino Sep 22, 2024
e804618
python format fix
prodriguezdefino Sep 22, 2024
7b7255b
adding xlang IT with BQ
prodriguezdefino Sep 22, 2024
2b901eb
adding missing primary key column to test
prodriguezdefino Sep 22, 2024
bb16979
python format fix
prodriguezdefino Sep 22, 2024
12dce1b
format xlang test
prodriguezdefino Sep 22, 2024
27a86db
more format xlang test fixes
prodriguezdefino Sep 22, 2024
e5766e7
and more format xlang test fixes
prodriguezdefino Sep 22, 2024
09f298a
adding missing import
prodriguezdefino Sep 22, 2024
e972c89
missing self reference
prodriguezdefino Sep 22, 2024
ec3373b
enabled create if needed functionality for CDC python integration, im…
prodriguezdefino Sep 24, 2024
23ea4c3
Update bigquery.py
prodriguezdefino Sep 24, 2024
143566a
triggering the xlang tests
prodriguezdefino Sep 24, 2024
4e390c7
fixing lint
prodriguezdefino Sep 24, 2024
51bae40
addressing few comments
prodriguezdefino Sep 26, 2024
6331691
cdc info is added after row transformation now
prodriguezdefino Sep 27, 2024
56fe2b6
remove not used param
prodriguezdefino Sep 27, 2024
ce85c88
Merge branch 'master' into python_xlang_cdc_bigquery
prodriguezdefino Sep 27, 2024
bd324c5
removed typing information for callable
prodriguezdefino Sep 27, 2024
3e5e31c
adding test for cdc using dicts as input and cdc write callable
prodriguezdefino Sep 27, 2024
0073348
simplifying the xlang configuration from python perspective, will add…
prodriguezdefino Oct 2, 2024
9dd4cfa
spotless apply
prodriguezdefino Oct 2, 2024
4208517
wrong property passed to xlang builder
prodriguezdefino Oct 2, 2024
cd7b6cd
missing self
prodriguezdefino Oct 2, 2024
72d1bf4
fixing xlang it
prodriguezdefino Oct 2, 2024
1487acc
fixes wrong property reference
prodriguezdefino Oct 2, 2024
96882c5
change cdc xlang test to use beam.io.WriteToBigQuery
prodriguezdefino Oct 3, 2024
538ef3c
force another build
prodriguezdefino Oct 3, 2024
1774f8f
modifying comment to trigger build.
prodriguezdefino Oct 4, 2024
1d99fc6
addressing PR comments, included new dicts based test for xlang pytho…
prodriguezdefino Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,7 @@ public static Write<RowMutation> applyRowMutations() {
.withFormatFunction(RowMutation::getTableRow)
.withRowMutationInformationFn(RowMutation::getMutationInformation);
}

/**
* A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord
* GenericRecords} to a BigQuery table.
Expand Down Expand Up @@ -2384,8 +2385,10 @@ public enum Method {
abstract WriteDisposition getWriteDisposition();

abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();

/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();

/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();

Expand Down Expand Up @@ -3472,7 +3475,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getRowMutationInformationFn() != null) {
checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
checkArgument(
getMethod() == Method.STORAGE_API_AT_LEAST_ONCE,
"When using row updates on BigQuery, StorageWrite API should execute using"
+ " \"at least once\" mode.");
checkArgument(
getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null,
"If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand All @@ -37,6 +39,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
Expand All @@ -62,6 +65,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;

Expand Down Expand Up @@ -257,6 +261,20 @@ public static Builder builder() {
@Nullable
public abstract ErrorHandling getErrorHandling();

@SchemaFieldDescription(
"This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ " adding the CDC info similar to: {cdc_info: {mutation_type:\"...\", "
+ "change_sequence_number:\"...\"}, record: {...}}")
@Nullable
public abstract Boolean getUseCdcWrites();

@SchemaFieldDescription(
"In the case of using CDC writes and setting CREATE_IF_NEEDED mode for the tables"
+ " a primary key is required.")
@Nullable
public abstract List<String> getCdcWritesPrimaryKey();
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -277,6 +295,10 @@ public abstract static class Builder {

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract Builder setUseCdcWrites(Boolean cdcWrites);

public abstract Builder setCdcWritesPrimaryKey(List<String> pkColumns);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
.BigQueryStorageWriteApiSchemaTransformConfiguration
Expand Down Expand Up @@ -344,14 +366,39 @@ public void process(ProcessContext c) {}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
Schema schema;
String fixedDestination = null;
List<String> primaryKey = null;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
}

RowDynamicDestinations withFixedDestination(String destination) {
this.fixedDestination = destination;
return this;
}

RowDynamicDestinations withPrimaryKey(List<String> primaryKey) {
this.primaryKey = primaryKey;
return this;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return element.getValue().getString("destination");
return fixedDestination != null
? fixedDestination
: element.getValue().getString("destination");
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -468,13 +515,15 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().equals(Arrays.asList("destination", "record")),
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just instantiate new RowDynamicDestinations(<row schema>, <primary key>) here? And avoid instantiating it again in validateAndIncludeCDCInformation?

.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));
} else if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a separate if block, outside of this if/else chain? We should be able to apply this method to both dynamic destination and single table cases. The only factor should be whether or not useCdcWrites is true

} else {
write = write.to(configuration.getTable()).useBeamSchema();
}
Expand All @@ -498,5 +547,60 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {

return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
checkArgument(
BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS
.get(configuration.getCreateDisposition().toUpperCase())
.equals(CreateDisposition.CREATE_IF_NEEDED)
&& !Optional.ofNullable(configuration.getCdcWritesPrimaryKey())
.orElse(ImmutableList.of())
.isEmpty(),
"When using CDC writes into BigQuery, alongside with CREATE_IF_NEEDED mode,"
+ " a primary key should be provided.");
}
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().contains("destination"),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field.");
}
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("cdc_info", "record")),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\"cdc_info\" Row field and a \"record\" Row field.");
checkArgument(
schema
.getField("cdc_info")
.getType()
.getRowSchema()
.equals(
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build()),
"When writing using CDC functionality, we expect a \"cdc_info\" field of Row type "
+ "with fields \"mutation_type\" and \"change_sequence_number\" of type string.");

RowDynamicDestinations destinations =
new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())
.withPrimaryKey(configuration.getCdcWritesPrimaryKey());
if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
destinations = destinations.withFixedDestination(configuration.getTable());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this duplicated code? (see previous comment)


return write
.to(destinations)
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a duplicated line that we should remove

.withPrimaryKey(configuration.getCdcWritesPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow("cdc_info").getString("mutation_type")),
row.getRow("cdc_info").getString("change_sequence_number")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.beam.sdk.io.gcp.testing;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableRow;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
Expand Down Expand Up @@ -51,12 +53,24 @@ class TableContainer {
this.keyedRows = Maps.newHashMap();
this.ids = new ArrayList<>();
this.sizeBytes = 0L;
// extract primary key information from Table if present
List<String> pkColumns = primaryKeyColumns(table);
this.primaryKeyColumns = pkColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(pkColumns, table);
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
static @Nullable List<String> primaryKeyColumns(Table table) {
return Optional.ofNullable(table.getTableConstraints())
.flatMap(constraints -> Optional.ofNullable(constraints.getPrimaryKey()))
.map(TableConstraints.PrimaryKey::getColumns)
.orElse(null);
}

static @Nullable List<Integer> primaryColumnFieldIndices(
@Nullable List<String> primaryKeyColumns, Table table) {
if (primaryKeyColumns == null) {
return null;
}
Map<String, Integer> indices =
IntStream.range(0, table.getSchema().getFields().size())
.boxed()
Expand All @@ -65,7 +79,13 @@ void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
for (String columnName : primaryKeyColumns) {
primaryKeyColumnIndices.add(Preconditions.checkStateNotNull(indices.get(columnName)));
}
this.primaryKeyColumnIndices = primaryKeyColumnIndices;
return primaryKeyColumnIndices;
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(primaryKeyColumns, table);
}

@Nullable
Expand All @@ -80,7 +100,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {
.stream()
.map(cell -> Preconditions.checkStateNotNull(cell.get("v")))
.collect(Collectors.toList());
;

return Preconditions.checkStateNotNull(primaryKeyColumnIndices).stream()
.map(cellValues::get)
.collect(Collectors.toList());
Expand All @@ -91,7 +111,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {

long addRow(TableRow row, String id) {
List<Object> primaryKey = getPrimaryKey(row);
if (primaryKey != null) {
if (primaryKey != null && !primaryKey.isEmpty()) {
if (keyedRows.putIfAbsent(primaryKey, row) != null) {
throw new RuntimeException(
"Primary key validation error! Multiple inserts with the same primary key.");
Expand Down
Loading
Loading