From c5e1d106cb5e56afef947097eff67977b75a3ec0 Mon Sep 17 00:00:00 2001 From: dengwe1 <159199800+dengwe1@users.noreply.github.com> Date: Wed, 28 Feb 2024 18:49:18 -0800 Subject: [PATCH] Support passing credentials from pipeline options into SpannerIO.readChangeStream (#30361) * Support credentials in SpannerConfig * Support passing credentials from pipeline options in SpannerIO --- CHANGES.md | 2 + .../sdk/io/gcp/spanner/SpannerAccessor.java | 15 +- .../sdk/io/gcp/spanner/SpannerConfig.java | 15 ++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 77 +++++++--- .../MetadataSpannerConfigFactory.java | 6 + .../io/gcp/spanner/SpannerAccessorTest.java | 23 +++ .../SpannerIOReadChangeStreamTest.java | 143 ++++++++++++++++++ 7 files changed, 255 insertions(+), 26 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java diff --git a/CHANGES.md b/CHANGES.md index fbbc12ef0a93a..57d729ac5f5db 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -94,6 +94,8 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed SpannerIO.readChangeStream to support propagating credentials from pipeline options + to the getDialect calls for authenticating with Spanner (Java) ([#30361](https://github.com/apache/beam/pull/30361)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index bb73a1c30872a..dc38445042183 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.auth.Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceFactory; import com.google.cloud.spanner.BatchClient; @@ -41,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +102,8 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) { } } - private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { + @VisibleForTesting + static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { SpannerOptions.Builder builder = SpannerOptions.newBuilder(); Set retryableCodes = new HashSet<>(); @@ -222,8 +225,16 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { if (databaseRole != null && databaseRole.get() != null && !databaseRole.get().isEmpty()) { builder.setDatabaseRole(databaseRole.get()); } - SpannerOptions options = builder.build(); + ValueProvider credentials = spannerConfig.getCredentials(); + if (credentials != null && credentials.get() != null) { + builder.setCredentials(credentials.get()); + } + return builder.build(); + } + + private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) { + SpannerOptions options = buildSpannerOptions(spannerConfig); Spanner spanner = options.getService(); String instanceId = spannerConfig.getInstanceId().get(); String databaseId = spannerConfig.getDatabaseId().get(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 4f2b9c8bd0987..01747299d6498 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -21,6 +21,7 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.auth.Credentials; import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; import com.google.cloud.spanner.Options.RpcPriority; @@ -84,6 +85,8 @@ public abstract class SpannerConfig implements Serializable { public abstract @Nullable ValueProvider getDataBoostEnabled(); + public abstract @Nullable ValueProvider getCredentials(); + abstract Builder toBuilder(); public static SpannerConfig create() { @@ -161,6 +164,8 @@ abstract Builder setExecuteStreamingSqlRetrySettings( abstract Builder setPartitionReadTimeout(ValueProvider partitionReadTimeout); + abstract Builder setCredentials(ValueProvider credentials); + public abstract SpannerConfig build(); } @@ -302,4 +307,14 @@ public SpannerConfig withPartitionReadTimeout(Duration partitionReadTimeout) { public SpannerConfig withPartitionReadTimeout(ValueProvider partitionReadTimeout) { return toBuilder().setPartitionReadTimeout(partitionReadTimeout).build(); } + + /** Specifies the credentials. */ + public SpannerConfig withCredentials(Credentials credentials) { + return withCredentials(ValueProvider.StaticValueProvider.of(credentials)); + } + + /** Specifies the credentials. */ + public SpannerConfig withCredentials(ValueProvider credentials) { + return toBuilder().setCredentials(credentials).build(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 6db79ab69b47a..5b676561bf8ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -31,6 +31,7 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.auth.Credentials; import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; import com.google.cloud.Timestamp; @@ -68,6 +69,7 @@ import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; @@ -86,6 +88,7 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; @@ -1667,31 +1670,15 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta getSpannerConfig().getProjectId().get(), partitionMetadataInstanceId, partitionMetadataDatabaseId); - SpannerConfig changeStreamSpannerConfig = getSpannerConfig(); - // Set default retryable errors for ReadChangeStream - if (changeStreamSpannerConfig.getRetryableCodes() == null) { - ImmutableSet defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED); - changeStreamSpannerConfig = - changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build(); - } - // Set default retry timeouts for ReadChangeStream - if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) { - changeStreamSpannerConfig = - changeStreamSpannerConfig - .toBuilder() - .setExecuteStreamingSqlRetrySettings( - RetrySettings.newBuilder() - .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5)) - .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) - .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) - .build()) - .build(); - } + + final SpannerConfig changeStreamSpannerConfig = buildChangeStreamSpannerConfig(); final SpannerConfig partitionMetadataSpannerConfig = MetadataSpannerConfigFactory.create( changeStreamSpannerConfig, partitionMetadataInstanceId, partitionMetadataDatabaseId); - Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig); - Dialect metadataDatabaseDialect = getDialect(partitionMetadataSpannerConfig); + final Dialect changeStreamDatabaseDialect = + getDialect(changeStreamSpannerConfig, input.getPipeline().getOptions()); + final Dialect metadataDatabaseDialect = + getDialect(partitionMetadataSpannerConfig, input.getPipeline().getOptions()); LOG.info( "The Spanner database " + changeStreamDatabaseId @@ -1773,10 +1760,52 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory))); return dataChangeRecordsOut; } + + @VisibleForTesting + SpannerConfig buildChangeStreamSpannerConfig() { + SpannerConfig changeStreamSpannerConfig = getSpannerConfig(); + // Set default retryable errors for ReadChangeStream + if (changeStreamSpannerConfig.getRetryableCodes() == null) { + ImmutableSet defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED); + changeStreamSpannerConfig = + changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build(); + } + // Set default retry timeouts for ReadChangeStream + if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) { + changeStreamSpannerConfig = + changeStreamSpannerConfig + .toBuilder() + .setExecuteStreamingSqlRetrySettings( + RetrySettings.newBuilder() + .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5)) + .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .build()) + .build(); + } + return changeStreamSpannerConfig; + } + } + + /** If credentials are not set in spannerConfig, uses the credentials from pipeline options. */ + @VisibleForTesting + static SpannerConfig buildSpannerConfigWithCredential( + SpannerConfig spannerConfig, PipelineOptions pipelineOptions) { + if (spannerConfig.getCredentials() == null && pipelineOptions != null) { + final Credentials credentials = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credentials != null) { + spannerConfig = spannerConfig.withCredentials(credentials); + } + } + return spannerConfig; } - private static Dialect getDialect(SpannerConfig spannerConfig) { - DatabaseClient databaseClient = SpannerAccessor.getOrCreate(spannerConfig).getDatabaseClient(); + private static Dialect getDialect(SpannerConfig spannerConfig, PipelineOptions pipelineOptions) { + // Allow passing the credential from pipeline options to the getDialect() call. + SpannerConfig spannerConfigWithCredential = + buildSpannerConfigWithCredential(spannerConfig, pipelineOptions); + DatabaseClient databaseClient = + SpannerAccessor.getOrCreate(spannerConfigWithCredential).getDatabaseClient(); return databaseClient.getDialect(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java index 30b5043f54e8a..7132d4deb0308 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java @@ -21,6 +21,7 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.auth.Credentials; import com.google.cloud.spanner.Options.RpcPriority; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.options.ValueProvider; @@ -113,6 +114,11 @@ public static SpannerConfig create( config = config.withRpcPriority(StaticValueProvider.of(rpcPriority.get())); } + ValueProvider credentials = primaryConfig.getCredentials(); + if (credentials != null) { + config = config.withCredentials(credentials); + } + return config; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessorTest.java index df38d22f5c13b..b80fba31d3a21 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessorTest.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.SpannerOptions; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.junit.Before; import org.junit.Test; @@ -141,4 +144,24 @@ public void testCreateWithEmptyDatabaseRole() { .getDatabaseClient(DatabaseId.of("project", "test1", "test1")); verify(serviceFactory.mockSpanner(), times(1)).close(); } + + @Test + public void testBuildSpannerOptionsWithCredential() { + TestCredential testCredential = new TestCredential(); + SpannerConfig config1 = + SpannerConfig.create() + .toBuilder() + .setServiceFactory(serviceFactory) + .setProjectId(StaticValueProvider.of("project")) + .setInstanceId(StaticValueProvider.of("test-instance")) + .setDatabaseId(StaticValueProvider.of("test-db")) + .setDatabaseRole(StaticValueProvider.of("test-role")) + .setCredentials(StaticValueProvider.of(testCredential)) + .build(); + + SpannerOptions options = SpannerAccessor.buildSpannerOptions(config1); + assertEquals("project", options.getProjectId()); + assertEquals("test-role", options.getDatabaseRole()); + assertEquals(testCredential, options.getCredentials()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java new file mode 100644 index 0000000000000..5fd3548a30045 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.auth.Credentials; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.RpcPriority; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SpannerIOReadChangeStreamTest { + + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final String TEST_METADATA_INSTANCE = "my-metadata-instance"; + private static final String TEST_METADATA_DATABASE = "my-metadata-database"; + private static final String TEST_METADATA_TABLE = "my-metadata-table"; + private static final String TEST_CHANGE_STREAM = "my-change-stream"; + + @Rule public final transient TestPipeline testPipeline = TestPipeline.create(); + + private SpannerConfig spannerConfig; + private SpannerIO.ReadChangeStream readChangeStream; + + @Before + public void setUp() throws Exception { + spannerConfig = + SpannerConfig.create() + .withProjectId(TEST_PROJECT) + .withInstanceId(TEST_INSTANCE) + .withDatabaseId(TEST_DATABASE); + + Timestamp startTimestamp = Timestamp.now(); + Timestamp endTimestamp = + Timestamp.ofTimeSecondsAndNanos( + startTimestamp.getSeconds() + 10, startTimestamp.getNanos()); + readChangeStream = + SpannerIO.readChangeStream() + .withSpannerConfig(spannerConfig) + .withChangeStreamName(TEST_CHANGE_STREAM) + .withMetadataInstance(TEST_METADATA_INSTANCE) + .withMetadataDatabase(TEST_METADATA_DATABASE) + .withMetadataTable(TEST_METADATA_TABLE) + .withRpcPriority(RpcPriority.MEDIUM) + .withInclusiveStartAt(startTimestamp) + .withInclusiveEndAt(endTimestamp); + } + + @Test + public void testSetPipelineCredential() { + TestCredential testCredential = new TestCredential(); + // Set the credential in the pipeline options. + testPipeline.getOptions().as(GcpOptions.class).setGcpCredential(testCredential); + SpannerConfig changeStreamSpannerConfig = readChangeStream.buildChangeStreamSpannerConfig(); + SpannerConfig metadataSpannerConfig = + MetadataSpannerConfigFactory.create( + changeStreamSpannerConfig, TEST_METADATA_INSTANCE, TEST_METADATA_DATABASE); + assertNull(changeStreamSpannerConfig.getCredentials()); + assertNull(metadataSpannerConfig.getCredentials()); + + SpannerConfig changeStreamSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + changeStreamSpannerConfig, testPipeline.getOptions()); + SpannerConfig metadataSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + metadataSpannerConfig, testPipeline.getOptions()); + assertEquals(testCredential, changeStreamSpannerConfigWithCredential.getCredentials().get()); + assertEquals(testCredential, metadataSpannerConfigWithCredential.getCredentials().get()); + } + + @Test + public void testSetSpannerConfigCredential() { + TestCredential testCredential = new TestCredential(); + // Set the credential in the SpannerConfig. + spannerConfig = spannerConfig.withCredentials(testCredential); + readChangeStream = readChangeStream.withSpannerConfig(spannerConfig); + SpannerConfig changeStreamSpannerConfig = readChangeStream.buildChangeStreamSpannerConfig(); + SpannerConfig metadataSpannerConfig = + MetadataSpannerConfigFactory.create( + changeStreamSpannerConfig, TEST_METADATA_INSTANCE, TEST_METADATA_DATABASE); + assertEquals(testCredential, changeStreamSpannerConfig.getCredentials().get()); + assertEquals(testCredential, metadataSpannerConfig.getCredentials().get()); + + SpannerConfig changeStreamSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + changeStreamSpannerConfig, testPipeline.getOptions()); + SpannerConfig metadataSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + metadataSpannerConfig, testPipeline.getOptions()); + assertEquals(testCredential, changeStreamSpannerConfigWithCredential.getCredentials().get()); + assertEquals(testCredential, metadataSpannerConfigWithCredential.getCredentials().get()); + } + + @Test + public void testWithDefaultCredential() { + // Get the default credential, without setting any credentials in the pipeline options or + // SpannerConfig. + Credentials defaultCredential = + testPipeline.getOptions().as(GcpOptions.class).getGcpCredential(); + SpannerConfig changeStreamSpannerConfig = readChangeStream.buildChangeStreamSpannerConfig(); + SpannerConfig metadataSpannerConfig = + MetadataSpannerConfigFactory.create( + changeStreamSpannerConfig, TEST_METADATA_INSTANCE, TEST_METADATA_DATABASE); + assertNull(changeStreamSpannerConfig.getCredentials()); + assertNull(metadataSpannerConfig.getCredentials()); + + SpannerConfig changeStreamSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + changeStreamSpannerConfig, testPipeline.getOptions()); + SpannerConfig metadataSpannerConfigWithCredential = + SpannerIO.buildSpannerConfigWithCredential( + metadataSpannerConfig, testPipeline.getOptions()); + assertEquals(defaultCredential, changeStreamSpannerConfigWithCredential.getCredentials().get()); + assertEquals(defaultCredential, metadataSpannerConfigWithCredential.getCredentials().get()); + } +}