Skip to content

Commit

Permalink
BigQuery: Decouple clustering from time partitioning when writing (#3…
Browse files Browse the repository at this point in the history
…0094)

* Decouple clustering from time partitioning when writing

* Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java

Co-authored-by: Michel Davit <michel@davit.fr>

* Refactor the tests, remove redundant input validations and reuse existing variables

* add to CHANGES.md

* add PR and not issue in CHANGES.md

---------

Co-authored-by: Michel Davit <michel@davit.fr>
  • Loading branch information
GStravinsky and RustedBones authored Jan 30, 2024
1 parent b9fd39c commit 22fefeb
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 50 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Enrichment Transform](https://s.apache.org/enrichment-transform) along with GCP BigTable handler added to Python SDK ([#30001](https://github.com/apache/beam/pull/30001)).
* Allow writing clustered and not time partitioned BigQuery tables (Java) ([#30094](https://github.com/apache/beam/pull/30094)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningClusteringDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
Expand Down Expand Up @@ -2744,8 +2744,7 @@ public Write<T> withJsonTimePartitioning(ValueProvider<String> partitioning) {
}

/**
* Specifies the clustering fields to use when writing to a single output table. Can only be
* used when {@link#withTimePartitioning(TimePartitioning)} is set. If {@link
* Specifies the clustering fields to use when writing to a single output table. If {@link
* #to(SerializableFunction)} or {@link #to(DynamicDestinations)} is used to write to dynamic
* tables, the fields here will be ignored; call {@link #withClustering()} instead.
*/
Expand Down Expand Up @@ -3357,9 +3356,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
}

// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null) {
if (getJsonTimePartitioning() != null
|| Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) {
dynamicDestinations =
new ConstantTimePartitioningDestinations<>(
new ConstantTimePartitioningClusteringDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getJsonTimePartitioning(),
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ private static void tryCreateTable(
TimePartitioning timePartitioning = tableDestination.getTimePartitioning();
if (timePartitioning != null) {
table.setTimePartitioning(timePartitioning);
Clustering clustering = tableDestination.getClustering();
if (clustering != null) {
table.setClustering(clustering);
}
}

Clustering clustering = tableDestination.getClustering();
if (clustering != null) {
table.setClustering(clustering);
}

if (kmsKey != null) {
table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,37 +272,41 @@ public String toString() {
}
}

static class ConstantTimePartitioningDestinations<T>
static class ConstantTimePartitioningClusteringDestinations<T>
extends DelegatingDynamicDestinations<T, TableDestination> {

private final ValueProvider<String> jsonTimePartitioning;
private final @Nullable ValueProvider<String> jsonTimePartitioning;
private final @Nullable ValueProvider<String> jsonClustering;

ConstantTimePartitioningDestinations(
ConstantTimePartitioningClusteringDestinations(
DynamicDestinations<T, TableDestination> inner,
ValueProvider<String> jsonTimePartitioning,
ValueProvider<String> jsonClustering) {
super(inner);
Preconditions.checkArgumentNotNull(
jsonTimePartitioning, "jsonTimePartitioning provider can not be null");
if (jsonTimePartitioning.isAccessible()) {
Preconditions.checkArgumentNotNull(
jsonTimePartitioning.get(), "jsonTimePartitioning can not be null");
}

checkArgument(
(jsonTimePartitioning != null
&& jsonTimePartitioning.isAccessible()
&& jsonTimePartitioning.get() != null)
|| (jsonClustering != null
&& jsonClustering.isAccessible()
&& jsonClustering.get() != null),
"at least one of jsonTimePartitioning or jsonClustering must be non-null, accessible "
+ "and present");

this.jsonTimePartitioning = jsonTimePartitioning;
this.jsonClustering = jsonClustering;
}

@Override
public TableDestination getDestination(@Nullable ValueInSingleWindow<T> element) {
TableDestination destination = super.getDestination(element);
String partitioning = this.jsonTimePartitioning.get();
checkArgument(partitioning != null, "jsonTimePartitioning can not be null");
String partitioning =
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);

return new TableDestination(
destination.getTableSpec(),
destination.getTableDescription(),
partitioning,
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null));
destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering);
}

@Override
Expand All @@ -316,10 +320,10 @@ public Coder<TableDestination> getDestinationCoder() {

@Override
public String toString() {
MoreObjects.ToStringHelper helper =
MoreObjects.toStringHelper(this)
.add("inner", inner)
.add("jsonTimePartitioning", jsonTimePartitioning);
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this).add("inner", inner);
if (jsonTimePartitioning != null) {
helper.add("jsonTimePartitioning", jsonTimePartitioning);
}
if (jsonClustering != null) {
helper.add("jsonClustering", jsonClustering);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
}
if (timePartitioning != null) {
loadConfig.setTimePartitioning(timePartitioning);
// only set clustering if timePartitioning is set
if (clustering != null) {
loadConfig.setClustering(clustering);
}
}

if (clustering != null) {
loadConfig.setClustering(clustering);
}

if (kmsKey != null) {
loadConfig.setDestinationEncryptionConfiguration(
new EncryptionConfiguration().setKmsKeyName(kmsKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private void verifySideInputs() {
}
}

void testTimePartitioningClustering(
void testTimePartitioningAndClustering(
BigQueryIO.Write.Method insertMethod, boolean enablePartitioning, boolean enableClustering)
throws Exception {
TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1");
Expand Down Expand Up @@ -545,16 +545,8 @@ void testTimePartitioningClustering(
}
}

void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception {
testTimePartitioningClustering(insertMethod, true, false);
}

void testClustering(BigQueryIO.Write.Method insertMethod) throws Exception {
testTimePartitioningClustering(insertMethod, true, true);
}

@Test
public void testTimePartitioning() throws Exception {
void testTimePartitioningAndClusteringWithAllMethods(
Boolean enablePartitioning, Boolean enableClustering) throws Exception {
BigQueryIO.Write.Method method;
if (useStorageApi) {
method =
Expand All @@ -564,15 +556,27 @@ public void testTimePartitioning() throws Exception {
} else {
method = Method.FILE_LOADS;
}
testTimePartitioning(method);
testTimePartitioningAndClustering(method, enablePartitioning, enableClustering);
}

@Test
public void testClusteringStorageApi() throws Exception {
if (useStorageApi) {
testClustering(
useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API);
}
public void testTimePartitioningWithoutClustering() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(true, false);
}

@Test
public void testTimePartitioningWithClustering() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(true, true);
}

@Test
public void testClusteringWithoutPartitioning() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(false, true);
}

@Test
public void testNoClusteringNoPartitioning() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(false, false);
}

@Test
Expand Down

0 comments on commit 22fefeb

Please sign in to comment.