From 22fefebacabad865ead0f09d75bd0ab672d44cbb Mon Sep 17 00:00:00 2001 From: GStravinsky <52568826+GStravinsky@users.noreply.github.com> Date: Tue, 30 Jan 2024 22:35:53 +0100 Subject: [PATCH] BigQuery: Decouple clustering from time partitioning when writing (#30094) * Decouple clustering from time partitioning when writing * Update sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java Co-authored-by: Michel Davit * Refactor the tests, remove redundant input validations and reuse existing variables * add to CHANGES.md * add PR and not issue in CHANGES.md --------- Co-authored-by: Michel Davit --- CHANGES.md | 3 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 10 ++--- .../io/gcp/bigquery/CreateTableHelpers.java | 10 +++-- .../bigquery/DynamicDestinationsHelpers.java | 42 ++++++++++--------- .../gcp/bigquery/UpdateSchemaDestination.java | 9 ++-- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 38 +++++++++-------- 6 files changed, 62 insertions(+), 50 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2d9c249bf6b3..c63464a6e01e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,7 +65,8 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [Enrichment Transform](https://s.apache.org/enrichment-transform) along with GCP BigTable handler added to Python SDK ([#30001](https://github.com/apache/beam/pull/30001)). +* Allow writing clustered and not time partitioned BigQuery tables (Java) ([#30094](https://github.com/apache/beam/pull/30094)). ## Breaking Changes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3d38ef0e83ff..cd62c5810d81 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -96,7 +96,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations; -import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningClusteringDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations; import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation; @@ -2744,8 +2744,7 @@ public Write withJsonTimePartitioning(ValueProvider partitioning) { } /** - * Specifies the clustering fields to use when writing to a single output table. Can only be - * used when {@link#withTimePartitioning(TimePartitioning)} is set. If {@link + * Specifies the clustering fields to use when writing to a single output table. If {@link * #to(SerializableFunction)} or {@link #to(DynamicDestinations)} is used to write to dynamic * tables, the fields here will be ignored; call {@link #withClustering()} instead. */ @@ -3357,9 +3356,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) { } // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. - if (getJsonTimePartitioning() != null) { + if (getJsonTimePartitioning() != null + || Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) { dynamicDestinations = - new ConstantTimePartitioningDestinations<>( + new ConstantTimePartitioningClusteringDestinations<>( (DynamicDestinations) dynamicDestinations, getJsonTimePartitioning(), StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering()))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 6edd3f71cc71..7a94657107ec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -179,11 +179,13 @@ private static void tryCreateTable( TimePartitioning timePartitioning = tableDestination.getTimePartitioning(); if (timePartitioning != null) { table.setTimePartitioning(timePartitioning); - Clustering clustering = tableDestination.getClustering(); - if (clustering != null) { - table.setClustering(clustering); - } } + + Clustering clustering = tableDestination.getClustering(); + if (clustering != null) { + table.setClustering(clustering); + } + if (kmsKey != null) { table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 62355fd9417d..1f042a81eb9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -272,23 +272,28 @@ public String toString() { } } - static class ConstantTimePartitioningDestinations + static class ConstantTimePartitioningClusteringDestinations extends DelegatingDynamicDestinations { - private final ValueProvider jsonTimePartitioning; + private final @Nullable ValueProvider jsonTimePartitioning; private final @Nullable ValueProvider jsonClustering; - ConstantTimePartitioningDestinations( + ConstantTimePartitioningClusteringDestinations( DynamicDestinations inner, ValueProvider jsonTimePartitioning, ValueProvider jsonClustering) { super(inner); - Preconditions.checkArgumentNotNull( - jsonTimePartitioning, "jsonTimePartitioning provider can not be null"); - if (jsonTimePartitioning.isAccessible()) { - Preconditions.checkArgumentNotNull( - jsonTimePartitioning.get(), "jsonTimePartitioning can not be null"); - } + + checkArgument( + (jsonTimePartitioning != null + && jsonTimePartitioning.isAccessible() + && jsonTimePartitioning.get() != null) + || (jsonClustering != null + && jsonClustering.isAccessible() + && jsonClustering.get() != null), + "at least one of jsonTimePartitioning or jsonClustering must be non-null, accessible " + + "and present"); + this.jsonTimePartitioning = jsonTimePartitioning; this.jsonClustering = jsonClustering; } @@ -296,13 +301,12 @@ static class ConstantTimePartitioningDestinations @Override public TableDestination getDestination(@Nullable ValueInSingleWindow element) { TableDestination destination = super.getDestination(element); - String partitioning = this.jsonTimePartitioning.get(); - checkArgument(partitioning != null, "jsonTimePartitioning can not be null"); + String partitioning = + Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null); + String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null); + return new TableDestination( - destination.getTableSpec(), - destination.getTableDescription(), - partitioning, - Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null)); + destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering); } @Override @@ -316,10 +320,10 @@ public Coder getDestinationCoder() { @Override public String toString() { - MoreObjects.ToStringHelper helper = - MoreObjects.toStringHelper(this) - .add("inner", inner) - .add("jsonTimePartitioning", jsonTimePartitioning); + MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this).add("inner", inner); + if (jsonTimePartitioning != null) { + helper.add("jsonTimePartitioning", jsonTimePartitioning); + } if (jsonClustering != null) { helper.add("jsonClustering", jsonClustering); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 51e61fe41953..65bb3bf11b1b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -288,11 +288,12 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( } if (timePartitioning != null) { loadConfig.setTimePartitioning(timePartitioning); - // only set clustering if timePartitioning is set - if (clustering != null) { - loadConfig.setClustering(clustering); - } } + + if (clustering != null) { + loadConfig.setClustering(clustering); + } + if (kmsKey != null) { loadConfig.setDestinationEncryptionConfiguration( new EncryptionConfiguration().setKmsKeyName(kmsKey)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 89cbc2cd24b8..21d3e53a0701 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -500,7 +500,7 @@ private void verifySideInputs() { } } - void testTimePartitioningClustering( + void testTimePartitioningAndClustering( BigQueryIO.Write.Method insertMethod, boolean enablePartitioning, boolean enableClustering) throws Exception { TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1"); @@ -545,16 +545,8 @@ void testTimePartitioningClustering( } } - void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception { - testTimePartitioningClustering(insertMethod, true, false); - } - - void testClustering(BigQueryIO.Write.Method insertMethod) throws Exception { - testTimePartitioningClustering(insertMethod, true, true); - } - - @Test - public void testTimePartitioning() throws Exception { + void testTimePartitioningAndClusteringWithAllMethods( + Boolean enablePartitioning, Boolean enableClustering) throws Exception { BigQueryIO.Write.Method method; if (useStorageApi) { method = @@ -564,15 +556,27 @@ public void testTimePartitioning() throws Exception { } else { method = Method.FILE_LOADS; } - testTimePartitioning(method); + testTimePartitioningAndClustering(method, enablePartitioning, enableClustering); } @Test - public void testClusteringStorageApi() throws Exception { - if (useStorageApi) { - testClustering( - useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API); - } + public void testTimePartitioningWithoutClustering() throws Exception { + testTimePartitioningAndClusteringWithAllMethods(true, false); + } + + @Test + public void testTimePartitioningWithClustering() throws Exception { + testTimePartitioningAndClusteringWithAllMethods(true, true); + } + + @Test + public void testClusteringWithoutPartitioning() throws Exception { + testTimePartitioningAndClusteringWithAllMethods(false, true); + } + + @Test + public void testNoClusteringNoPartitioning() throws Exception { + testTimePartitioningAndClusteringWithAllMethods(false, false); } @Test