diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index a0b25cb64c0..0f4310f9b4d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -42,6 +42,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
+import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
@@ -72,6 +73,7 @@ abstract static class Builder, T extends AbstractReadCon
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
+ private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();
@@ -117,6 +119,11 @@ B setClock(Clock clock) {
return self();
}
+ B setDefaultDirectedReadOptions(DirectedReadOptions directedReadOptions) {
+ this.defaultDirectedReadOption = directedReadOptions;
+ return self();
+ }
+
abstract T build();
}
@@ -399,6 +406,7 @@ void initTransaction() {
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;
+ private final DirectedReadOptions defaultDirectedReadOptions;
private final Clock clock;
@GuardedBy("lock")
@@ -423,6 +431,7 @@ void initTransaction() {
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
+ this.defaultDirectedReadOptions = builder.defaultDirectedReadOption;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
@@ -623,6 +632,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
+ if (options.hasDirectedReadOptions()) {
+ builder.setDirectedReadOptions(options.directedReadOptions());
+ } else if (defaultDirectedReadOptions != null) {
+ builder.setDirectedReadOptions(defaultDirectedReadOptions);
+ }
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
@@ -811,6 +825,11 @@ ResultSet readInternalWithOptions(
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
+ if (readOptions.hasDirectedReadOptions()) {
+ builder.setDirectedReadOptions(readOptions.directedReadOptions());
+ } else if (defaultDirectedReadOptions != null) {
+ builder.setDirectedReadOptions(defaultDirectedReadOptions);
+ }
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
index 0191a11be1c..eab90a266c9 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
@@ -60,7 +60,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
- .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
+ .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
+ .setDefaultDirectedReadOptions(
+ sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
checkNotNull(bound));
}
@@ -77,7 +79,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
- .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
+ .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
+ .setDefaultDirectedReadOptions(
+ sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
batchTransactionId);
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
index 2bd35ec7853..dda12b60d64 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
@@ -17,6 +17,7 @@
package com.google.cloud.spanner;
import com.google.common.base.Preconditions;
+import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;
@@ -224,6 +225,18 @@ public static CreateUpdateDeleteAdminApiOption validateOnly(Boolean validateOnly
return new ValidateOnlyOption(validateOnly);
}
+ /**
+ * Option to request DirectedRead for ReadOnlyTransaction and SingleUseTransaction.
+ *
+ *
The DirectedReadOptions can be used to indicate which replicas or regions should be used for
+ * non-transactional reads or queries. Not all requests can be sent to non-leader replicas. In
+ * particular, some requests such as reads within read-write transactions must be sent to a
+ * designated leader replica. These requests ignore DirectedReadOptions.
+ */
+ public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOptions) {
+ return new DirectedReadOption(directedReadOptions);
+ }
+
/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
@@ -325,6 +338,21 @@ void appendToOptions(Options options) {
}
}
+ static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
+ private final DirectedReadOptions directedReadOptions;
+
+ DirectedReadOption(DirectedReadOptions directedReadOptions) {
+ this.directedReadOptions =
+ Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
+ ;
+ }
+
+ @Override
+ void appendToOptions(Options options) {
+ options.directedReadOptions = directedReadOptions;
+ }
+ }
+
private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
@@ -338,6 +366,7 @@ void appendToOptions(Options options) {
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
+ private DirectedReadOptions directedReadOptions;
// Construction is via factory methods below.
private Options() {}
@@ -438,6 +467,14 @@ Boolean dataBoostEnabled() {
return dataBoostEnabled;
}
+ boolean hasDirectedReadOptions() {
+ return directedReadOptions != null;
+ }
+
+ DirectedReadOptions directedReadOptions() {
+ return directedReadOptions;
+ }
+
@Override
public String toString() {
StringBuilder b = new StringBuilder();
@@ -477,6 +514,9 @@ public String toString() {
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
+ if (directedReadOptions != null) {
+ b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
+ }
return b.toString();
}
@@ -512,7 +552,8 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
- && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
+ && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
+ && Objects.equals(directedReadOptions(), that.directedReadOptions());
}
@Override
@@ -557,6 +598,9 @@ public int hashCode() {
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
+ if (directedReadOptions != null) {
+ result = 31 * result + directedReadOptions.hashCode();
+ }
return result;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 0e763dbc93d..53bf37feb05 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -255,6 +255,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
@@ -274,6 +275,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
@@ -293,6 +295,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index ba22ec54487..877ea72e467 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -33,6 +33,7 @@
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
+import com.google.cloud.spanner.Options.DirectedReadOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
@@ -50,6 +51,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
@@ -137,6 +139,7 @@ public class SpannerOptions extends ServiceOptions {
private final String compressorName;
private final boolean leaderAwareRoutingEnabled;
private final boolean attemptDirectPath;
+ private final DirectedReadOptions directedReadOptions;
/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
@@ -627,6 +630,7 @@ private SpannerOptions(Builder builder) {
compressorName = builder.compressorName;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
attemptDirectPath = builder.attemptDirectPath;
+ directedReadOptions = builder.directedReadOptions;
}
/**
@@ -729,6 +733,7 @@ public static class Builder
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private boolean leaderAwareRoutingEnabled = true;
private boolean attemptDirectPath = true;
+ private DirectedReadOptions directedReadOptions;
private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
@@ -789,6 +794,7 @@ private Builder() {
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
this.attemptDirectPath = options.attemptDirectPath;
+ this.directedReadOptions = options.directedReadOptions;
}
@Override
@@ -1153,6 +1159,32 @@ public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
return this;
}
+ /**
+ * Sets the {@link DirectedReadOption} that specify which replicas or regions should be used for
+ * non-transactional reads or queries.
+ *
+ * DirectedReadOptions set at the request level will take precedence over the options set
+ * using this method.
+ *
+ *
An example below of how {@link DirectedReadOptions} can be constructed by including a
+ * replica.
+ *
+ *
+ * DirectedReadOptions.newBuilder()
+ * .setIncludeReplicas(
+ * IncludeReplicas.newBuilder()
+ * .addReplicaSelections(
+ * ReplicaSelection.newBuilder().setLocation("us-east1").build()))
+ * .build();
+ * }
+ *
+ */
+ public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
+ this.directedReadOptions =
+ Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
+ return this;
+ }
+
/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
@@ -1371,6 +1403,10 @@ public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}
+ public DirectedReadOptions getDirectedReadOptions() {
+ return directedReadOptions;
+ }
+
@BetaApi
public boolean isAttemptDirectPath() {
return attemptDirectPath;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
index 31b73581f6b..16e4aa9600d 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
@@ -25,6 +25,9 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
@@ -45,6 +48,14 @@
@RunWith(Parameterized.class)
public class AbstractReadContextTest {
+ private static final DirectedReadOptions DIRECTED_READ_OPTIONS =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-west1").build()))
+ .build();
+
@Parameter(0)
public QueryOptions defaultQueryOptions;
@@ -250,4 +261,15 @@ public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
}
+
+ @Test
+ public void testGetExecuteSqlRequestBuilderWithDirectedReadOptions() {
+ ExecuteSqlRequest.Builder request =
+ context.getExecuteSqlRequestBuilder(
+ Statement.of("SELECT * FROM FOO"),
+ QueryMode.NORMAL,
+ Options.fromQueryOptions(Options.directedRead(DIRECTED_READ_OPTIONS)),
+ false);
+ assertEquals(DIRECTED_READ_OPTIONS, request.getDirectedReadOptions());
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
index aea8a4dcb64..e6263568fe5 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
@@ -21,6 +21,7 @@
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_TABLE_NAME;
import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
+import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1_FROM_TABLE;
import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1_RESULTSET;
import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.truth.Truth.assertThat;
@@ -75,6 +76,9 @@
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.DeleteSessionRequest;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
@@ -169,6 +173,20 @@ public class DatabaseClientImplTest {
.setStatus(STATUS_OK)
.addAllIndexes(ImmutableList.of(2, 3))
.build());
+ private static final DirectedReadOptions DIRECTED_READ_OPTIONS1 =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-west1").build()))
+ .build();
+ private static final DirectedReadOptions DIRECTED_READ_OPTIONS2 =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-east1").build()))
+ .build();
private Spanner spanner;
private Spanner spannerWithEmptySessionPool;
private static ExecutorService executor;
@@ -186,6 +204,8 @@ public static void startStaticServer() throws IOException {
StatementResult.exception(
INVALID_UPDATE_STATEMENT,
Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException()));
+ mockSpanner.putStatementResult(
+ StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET));
mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES);
executor = Executors.newSingleThreadExecutor();
@@ -1518,6 +1538,69 @@ public void testExecuteQueryWithTag() {
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}
+ @Test
+ public void testExecuteQuery_withDirectedReadOptionsViaRequest() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client.singleUse().executeQuery(SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS1))) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(1, requests.size());
+ ExecuteSqlRequest request = requests.get(0);
+ assertTrue(request.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions());
+ }
+
+ @Test
+ public void testExecuteQuery_withDirectedReadOptionsViaSpannerOptions() {
+ Spanner spannerWithDirectedReadOptions =
+ spanner
+ .getOptions()
+ .toBuilder()
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService();
+ DatabaseClient client =
+ spannerWithDirectedReadOptions.getDatabaseClient(
+ DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(requests.size(), 1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertTrue(request.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS2, request.getDirectedReadOptions());
+ }
+
+ @Test
+ public void testExecuteQuery_whenMultipleDirectedReadsOptions_preferRequestOption() {
+ Spanner spannerWithDirectedReadOptions =
+ spanner
+ .getOptions()
+ .toBuilder()
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService();
+ DatabaseClient client =
+ spannerWithDirectedReadOptions.getDatabaseClient(
+ DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client.singleUse().executeQuery(SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS1))) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(requests.size(), 1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertTrue(request.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions());
+ }
+
@Test
public void testExecuteReadWithTag() {
DatabaseClient client =
@@ -1542,6 +1625,79 @@ public void testExecuteReadWithTag() {
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}
+ @Test
+ public void testExecuteReadWithDirectedReadOptions() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client
+ .singleUse()
+ .read(
+ READ_TABLE_NAME,
+ KeySet.singleKey(Key.of(1L)),
+ READ_COLUMN_NAMES,
+ Options.directedRead(DIRECTED_READ_OPTIONS1))) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ReadRequest.class);
+ assertEquals(1, requests.size());
+ ReadRequest request = requests.get(0);
+ assertTrue(request.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions());
+ }
+
+ @Test
+ public void testExecuteReadWithDirectedReadOptionsViaSpannerOptions() {
+ Spanner spannerWithDirectedReadOptions =
+ spanner
+ .getOptions()
+ .toBuilder()
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService();
+ DatabaseClient client =
+ spannerWithDirectedReadOptions.getDatabaseClient(
+ DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ try (ResultSet resultSet =
+ client.singleUse().read(READ_TABLE_NAME, KeySet.singleKey(Key.of(1L)), READ_COLUMN_NAMES)) {
+ while (resultSet.next()) {}
+ }
+
+ List requests = mockSpanner.getRequestsOfType(ReadRequest.class);
+ assertEquals(requests.size(), 1);
+ ReadRequest request = requests.get(0);
+ assertTrue(request.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS2, request.getDirectedReadOptions());
+ }
+
+ @Test
+ public void testReadWriteExecuteQueryWithDirectedReadOptionsViaSpannerOptions() {
+ Spanner spannerWithDirectedReadOptions =
+ spanner
+ .getOptions()
+ .toBuilder()
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService();
+ DatabaseClient client =
+ spannerWithDirectedReadOptions.getDatabaseClient(
+ DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ TransactionRunner runner = client.readWriteTransaction();
+ runner.run(
+ transaction -> {
+ try (ResultSet resultSet = transaction.executeQuery(SELECT1)) {
+ while (resultSet.next()) {}
+ }
+ return null;
+ });
+
+ List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
+ assertEquals(requests.size(), 1);
+ ExecuteSqlRequest request = requests.get(0);
+ assertFalse(request.hasDirectedReadOptions());
+ }
+
@Test
public void testReadWriteExecuteQueryWithTag() {
DatabaseClient client =
@@ -2728,7 +2884,7 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio
@Test
public void testBackendQueryOptions() {
- // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests
+ // Use a Spanner instance with MinSession=0 to prevent background requests
// from the session pool interfering with the test case.
try (Spanner spanner =
SpannerOptions.newBuilder()
@@ -2769,7 +2925,7 @@ public void testBackendQueryOptions() {
@Test
public void testBackendQueryOptionsWithAnalyzeQuery() {
- // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests
+ // Use a Spanner instance with MinSession=0 to prevent background requests
// from the session pool interfering with the test case.
try (Spanner spanner =
SpannerOptions.newBuilder()
@@ -2812,7 +2968,7 @@ public void testBackendQueryOptionsWithAnalyzeQuery() {
@Test
public void testBackendPartitionQueryOptions() {
- // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests
+ // Use a Spanner instance with MinSession=0 to prevent background requests
// from the session pool interfering with the test case.
try (Spanner spanner =
SpannerOptions.newBuilder()
@@ -2820,6 +2976,58 @@ public void testBackendPartitionQueryOptions() {
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService()) {
+ BatchClient client =
+ spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE"));
+ BatchReadOnlyTransaction transaction =
+ client.batchReadOnlyTransaction(TimestampBound.strong());
+ List partitions =
+ transaction.partitionQuery(
+ PartitionOptions.newBuilder().setMaxPartitions(10L).build(),
+ Statement.newBuilder(SELECT1.getSql())
+ .withQueryOptions(
+ QueryOptions.newBuilder()
+ .setOptimizerVersion("1")
+ .setOptimizerStatisticsPackage("custom-package")
+ .build())
+ .build(),
+ Options.directedRead(DIRECTED_READ_OPTIONS1));
+ try (ResultSet rs = transaction.execute(partitions.get(0))) {
+ // Just iterate over the results to execute the query.
+ while (rs.next()) {}
+ } finally {
+ transaction.cleanup();
+ }
+ // Check if the last query executed is a DeleteSessionRequest and the second last query
+ // executed is a ExecuteSqlRequest and was executed using a custom optimizer version,
+ // statistics package and directed read options.
+ List requests = mockSpanner.getRequests();
+ assert requests.size() >= 2 : "required to have at least 2 requests";
+ assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class);
+ assertThat(requests.get(requests.size() - 2)).isInstanceOf(ExecuteSqlRequest.class);
+ ExecuteSqlRequest executeSqlRequest = (ExecuteSqlRequest) requests.get(requests.size() - 2);
+ assertThat(executeSqlRequest.getQueryOptions()).isNotNull();
+ assertThat(executeSqlRequest.getQueryOptions().getOptimizerVersion()).isEqualTo("1");
+ assertThat(executeSqlRequest.getQueryOptions().getOptimizerStatisticsPackage())
+ .isEqualTo("custom-package");
+ assertThat(executeSqlRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS1);
+ }
+ }
+
+ @Test
+ public void
+ testBackendPartitionQueryOptions_whenDirectedReadOptionsViaSpannerOptions_assertOptions() {
+ // Use a Spanner instance with MinSession=0 to prevent background requests
+ // from the session pool interfering with the test case.
+ try (Spanner spanner =
+ SpannerOptions.newBuilder()
+ .setProjectId("[PROJECT]")
+ .setChannelProvider(channelProvider)
+ .setCredentials(NoCredentials.getInstance())
+ .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
.build()
.getService()) {
BatchClient client =
@@ -2843,8 +3051,8 @@ public void testBackendPartitionQueryOptions() {
transaction.cleanup();
}
// Check if the last query executed is a DeleteSessionRequest and the second last query
- // executed is a ExecuteSqlRequest and was executed using a custom optimizer version and
- // statistics package.
+ // executed is a ExecuteSqlRequest and was executed using a custom optimizer version,
+ // statistics package and directed read options.
List requests = mockSpanner.getRequests();
assert requests.size() >= 2 : "required to have at least 2 requests";
assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class);
@@ -2854,6 +3062,91 @@ public void testBackendPartitionQueryOptions() {
assertThat(executeSqlRequest.getQueryOptions().getOptimizerVersion()).isEqualTo("1");
assertThat(executeSqlRequest.getQueryOptions().getOptimizerStatisticsPackage())
.isEqualTo("custom-package");
+ assertThat(executeSqlRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS2);
+ }
+ }
+
+ @Test
+ public void testBackendPartitionReadOptions() {
+ // Use a Spanner instance with MinSession=0 to prevent background requests
+ // from the session pool interfering with the test case.
+ try (Spanner spanner =
+ SpannerOptions.newBuilder()
+ .setProjectId("[PROJECT]")
+ .setChannelProvider(channelProvider)
+ .setCredentials(NoCredentials.getInstance())
+ .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService()) {
+ BatchClient client =
+ spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE"));
+ BatchReadOnlyTransaction transaction =
+ client.batchReadOnlyTransaction(TimestampBound.strong());
+ List partitions =
+ transaction.partitionRead(
+ PartitionOptions.newBuilder().setMaxPartitions(10L).build(),
+ "FOO",
+ KeySet.all(),
+ Lists.newArrayList("1"),
+ Options.directedRead(DIRECTED_READ_OPTIONS1));
+ try (ResultSet rs = transaction.execute(partitions.get(0))) {
+ // Just iterate over the results to execute the query.
+ while (rs.next()) {}
+ } finally {
+ transaction.cleanup();
+ }
+ // Check if the last query executed is a DeleteSessionRequest and the second last query
+ // executed is a ExecuteSqlRequest and was executed using a custom optimizer version,
+ // statistics package and directed read options.
+ List requests = mockSpanner.getRequests();
+ assert requests.size() >= 2 : "required to have at least 2 requests";
+ assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class);
+ assertThat(requests.get(requests.size() - 2)).isInstanceOf(ReadRequest.class);
+ ReadRequest readRequest = (ReadRequest) requests.get(requests.size() - 2);
+ assertThat(readRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS1);
+ }
+ }
+
+ @Test
+ public void
+ testBackendPartitionReadOptions_whenDirectedReadOptionsViaSpannerOptions_assertOptions() {
+ // Use a Spanner instance with MinSession=0 to prevent background requests
+ // from the session pool interfering with the test case.
+ try (Spanner spanner =
+ SpannerOptions.newBuilder()
+ .setProjectId("[PROJECT]")
+ .setChannelProvider(channelProvider)
+ .setCredentials(NoCredentials.getInstance())
+ .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build())
+ .setDirectedReadOptions(DIRECTED_READ_OPTIONS2)
+ .build()
+ .getService()) {
+ BatchClient client =
+ spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE"));
+ BatchReadOnlyTransaction transaction =
+ client.batchReadOnlyTransaction(TimestampBound.strong());
+ List partitions =
+ transaction.partitionRead(
+ PartitionOptions.newBuilder().setMaxPartitions(10L).build(),
+ "FOO",
+ KeySet.all(),
+ Lists.newArrayList("1"));
+ try (ResultSet rs = transaction.execute(partitions.get(0))) {
+ // Just iterate over the results to execute the query.
+ while (rs.next()) {}
+ } finally {
+ transaction.cleanup();
+ }
+ // Check if the last query executed is a DeleteSessionRequest and the second last query
+ // executed is a ExecuteSqlRequest and was executed using a custom optimizer version,
+ // statistics package and directed read options.
+ List requests = mockSpanner.getRequests();
+ assert requests.size() >= 2 : "required to have at least 2 requests";
+ assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class);
+ assertThat(requests.get(requests.size() - 2)).isInstanceOf(ReadRequest.class);
+ ReadRequest readRequest = (ReadRequest) requests.get(requests.size() - 2);
+ assertThat(readRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS2);
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java
index af336d3f582..83bb1728ac0 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java
@@ -49,6 +49,7 @@ public class MockSpannerTestUtil {
.build())
.setMetadata(SELECT1_METADATA)
.build();
+ public static final Statement SELECT1_FROM_TABLE = Statement.of("SELECT 1 FROM FOO WHERE 1=1");
static final String TEST_PROJECT = "my-project";
static final String TEST_INSTANCE = "my-instance";
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
index d40f9b39ea1..e0bbf81f297 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
@@ -24,6 +24,9 @@
import static org.junit.Assert.assertTrue;
import com.google.cloud.spanner.Options.RpcPriority;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.RequestOptions.Priority;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -32,6 +35,13 @@
/** Unit tests for {@link Options}. */
@RunWith(JUnit4.class)
public class OptionsTest {
+ private static final DirectedReadOptions DIRECTED_READ_OPTIONS =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-west1").build()))
+ .build();
@Test
public void negativeLimitsNotAllowed() {
@@ -65,13 +75,18 @@ public void zeroPrefetchChunksNotAllowed() {
public void allOptionsPresent() {
Options options =
Options.fromReadOptions(
- Options.limit(10), Options.prefetchChunks(1), Options.dataBoostEnabled(true));
+ Options.limit(10),
+ Options.prefetchChunks(1),
+ Options.dataBoostEnabled(true),
+ Options.directedRead(DIRECTED_READ_OPTIONS));
assertThat(options.hasLimit()).isTrue();
assertThat(options.limit()).isEqualTo(10);
assertThat(options.hasPrefetchChunks()).isTrue();
assertThat(options.prefetchChunks()).isEqualTo(1);
assertThat(options.hasDataBoostEnabled()).isTrue();
assertTrue(options.dataBoostEnabled());
+ assertTrue(options.hasDirectedReadOptions());
+ assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions());
}
@Test
@@ -84,6 +99,7 @@ public void allOptionsAbsent() {
assertThat(options.hasPriority()).isFalse();
assertThat(options.hasTag()).isFalse();
assertThat(options.hasDataBoostEnabled()).isFalse();
+ assertThat(options.hasDirectedReadOptions()).isFalse();
assertThat(options.toString()).isEqualTo("");
assertThat(options.equals(options)).isTrue();
assertThat(options.equals(null)).isFalse();
@@ -161,14 +177,28 @@ public void readOptionsTest() {
boolean dataBoost = true;
Options options =
Options.fromReadOptions(
- Options.limit(limit), Options.tag(tag), Options.dataBoostEnabled(true));
+ Options.limit(limit),
+ Options.tag(tag),
+ Options.dataBoostEnabled(true),
+ Options.directedRead(DIRECTED_READ_OPTIONS));
assertThat(options.toString())
.isEqualTo(
- "limit: " + limit + " " + "tag: " + tag + " " + "dataBoostEnabled: " + dataBoost + " ");
+ "limit: "
+ + limit
+ + " "
+ + "tag: "
+ + tag
+ + " "
+ + "dataBoostEnabled: "
+ + dataBoost
+ + " "
+ + "directedReadOptions: "
+ + DIRECTED_READ_OPTIONS
+ + " ");
assertThat(options.tag()).isEqualTo(tag);
assertEquals(dataBoost, options.dataBoostEnabled());
- assertThat(options.hashCode()).isEqualTo(-96091607);
+ assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions());
}
@Test
@@ -199,7 +229,10 @@ public void queryOptionsTest() {
boolean dataBoost = true;
Options options =
Options.fromQueryOptions(
- Options.prefetchChunks(chunks), Options.tag(tag), Options.dataBoostEnabled(true));
+ Options.prefetchChunks(chunks),
+ Options.tag(tag),
+ Options.dataBoostEnabled(true),
+ Options.directedRead(DIRECTED_READ_OPTIONS));
assertThat(options.toString())
.isEqualTo(
"prefetchChunks: "
@@ -210,11 +243,14 @@ public void queryOptionsTest() {
+ " "
+ "dataBoostEnabled: "
+ dataBoost
+ + " "
+ + "directedReadOptions: "
+ + DIRECTED_READ_OPTIONS
+ " ");
assertThat(options.prefetchChunks()).isEqualTo(chunks);
assertThat(options.tag()).isEqualTo(tag);
assertEquals(dataBoost, options.dataBoostEnabled());
- assertThat(options.hashCode()).isEqualTo(1274581983);
+ assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions());
}
@Test
@@ -630,4 +666,29 @@ public void optimisticLockHashCode() {
assertEquals(option1.hashCode(), option2.hashCode());
assertNotEquals(option1.hashCode(), option3.hashCode());
}
+
+ @Test
+ public void directedReadEquality() {
+ Options option1 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS));
+ Options option2 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS));
+ Options option3 = Options.fromTransactionOptions();
+
+ assertEquals(option1, option2);
+ assertNotEquals(option1, option3);
+ }
+
+ @Test
+ public void directedReadHashCode() {
+ Options option1 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS));
+ Options option2 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS));
+ Options option3 = Options.fromTransactionOptions();
+
+ assertEquals(option1.hashCode(), option2.hashCode());
+ assertNotEquals(option1.hashCode(), option3.hashCode());
+ }
+
+ @Test
+ public void directedReadsNullNotAllowed() {
+ assertThrows(NullPointerException.class, () -> Options.directedRead(null));
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
index 635838512c7..42e53a6b8e7 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
@@ -48,6 +48,9 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
@@ -698,6 +701,27 @@ public void testLeaderAwareRoutingEnablement() {
.isLeaderAwareRoutingEnabled());
}
+ @Test
+ public void testSetDirectedReadOptions() {
+ final DirectedReadOptions directedReadOptions =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-west1").build())
+ .build())
+ .build();
+ SpannerOptions options =
+ SpannerOptions.newBuilder()
+ .setProjectId("[PROJECT]")
+ .setDirectedReadOptions(directedReadOptions)
+ .build();
+ assertEquals(options.getDirectedReadOptions(), directedReadOptions);
+ assertThrows(
+ NullPointerException.class,
+ () -> SpannerOptions.newBuilder().setDirectedReadOptions(null).build());
+ }
+
@Test
public void testSpannerCallContextTimeoutConfigurator_NullValues() {
SpannerCallContextTimeoutConfigurator configurator =
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java
new file mode 100644
index 00000000000..217da5f4bc5
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner.it;
+
+import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
+import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+
+import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.Database;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.IntegrationTestEnv;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Options;
+import com.google.cloud.spanner.ParallelIntegrationTest;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.TransactionContext;
+import com.google.cloud.spanner.TransactionManager;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.common.collect.Lists;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@Category(ParallelIntegrationTest.class)
+@RunWith(JUnit4.class)
+public class ITDirectedReadsTest {
+
+ private static final DirectedReadOptions DIRECTED_READ_OPTIONS =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder().setLocation("us-west1").build()))
+ .build();
+
+ @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
+ private static Database db;
+
+ @BeforeClass
+ public static void setUp() {
+ db =
+ env.getTestHelper()
+ .createTestDatabase("CREATE TABLE TEST (ID INT64, NAME STRING(100)) PRIMARY KEY (ID)");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ db.drop();
+ }
+
+ @Test
+ public void testReadWriteTransactionRunner_queryWithDirectedReadOptionsViaRequest_throwsError() {
+ // Directed Read Options set at an RPC level is not acceptable for RW transaction
+
+ assumeFalse("Emulator does not support directed reads", isUsingEmulator());
+ SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
+ try (Spanner spanner = options.getService()) {
+ DatabaseClient client = spanner.getDatabaseClient(db.getId());
+ TransactionRunner runner = client.readWriteTransaction();
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () ->
+ runner.run(
+ transaction -> {
+ try (ResultSet resultSet =
+ transaction.executeQuery(
+ SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS))) {
+ while (resultSet.next()) {}
+ }
+ return null;
+ }));
+
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ e.getMessage()
+ .contains("Directed reads can only be performed in a read-only transaction."));
+ }
+ }
+
+ @Test
+ public void testReadWriteTransactionRunner_readWithDirectedReadOptionsViaRequest_throwsError() {
+ // Directed Read Options set at an RPC level is not acceptable for RW transaction
+
+ assumeFalse("Emulator does not support directed reads", isUsingEmulator());
+ SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
+ try (Spanner spanner = options.getService()) {
+ DatabaseClient client = spanner.getDatabaseClient(db.getId());
+ TransactionRunner runner = client.readWriteTransaction();
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () ->
+ runner.run(
+ transaction -> {
+ try (ResultSet resultSet =
+ transaction.read(
+ "TEST",
+ KeySet.singleKey(Key.of(1L)),
+ Lists.newArrayList("NAME"),
+ Options.directedRead(DIRECTED_READ_OPTIONS))) {
+ while (resultSet.next()) {}
+ }
+ return null;
+ }));
+
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ e.getMessage()
+ .contains("Directed reads can only be performed in a read-only transaction."));
+ }
+ }
+
+ @Test
+ public void testReadWriteTransactionManager_readWithDirectedReadOptionsViaRequest_throwsError() {
+ // Directed Read Options set at an RPC level is not acceptable for RW transaction
+
+ assumeFalse("Emulator does not support directed reads", isUsingEmulator());
+ SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build();
+ try (Spanner spanner = options.getService()) {
+ DatabaseClient client = spanner.getDatabaseClient(db.getId());
+ try (TransactionManager manager = client.transactionManager()) {
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () -> {
+ TransactionContext transaction = manager.begin();
+ try {
+ while (true) {
+
+ ResultSet resultSet =
+ transaction.read(
+ "TEST",
+ KeySet.singleKey(Key.of(1L)),
+ Lists.newArrayList("NAME"),
+ Options.directedRead(DIRECTED_READ_OPTIONS));
+ while (resultSet.next()) {}
+
+ manager.commit();
+ assertNotNull(manager.getCommitTimestamp());
+ break;
+ }
+ } catch (AbortedException ex) {
+ transaction = manager.resetForRetry();
+ }
+ });
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ assertTrue(
+ e.getMessage()
+ .contains("Directed reads can only be performed in a read-only transaction."));
+ }
+ }
+ }
+}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java
index c28b48c529a..2d888996465 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java
@@ -21,6 +21,9 @@
import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.cloud.spanner.Database;
@@ -42,6 +45,9 @@
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.connection.ConnectionOptions;
import com.google.cloud.spanner.testing.RemoteSpannerHelper;
+import com.google.spanner.v1.DirectedReadOptions;
+import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
+import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import io.grpc.Context;
import java.util.ArrayList;
import java.util.Arrays;
@@ -77,6 +83,17 @@ public class ITReadTest {
private static final Type TABLE_TYPE =
Type.struct(
StructField.of("key", Type.string()), StructField.of("stringvalue", Type.string()));
+ private static DirectedReadOptions DIRECTED_READ_OPTIONS =
+ DirectedReadOptions.newBuilder()
+ .setIncludeReplicas(
+ IncludeReplicas.newBuilder()
+ .addReplicaSelections(
+ ReplicaSelection.newBuilder()
+ .setLocation("us-west1")
+ .setType(ReplicaSelection.Type.READ_ONLY)
+ .build())
+ .setAutoFailoverDisabled(true))
+ .build();
private static DatabaseClient googleStandardSQLClient;
private static DatabaseClient postgreSQLClient;
@@ -336,6 +353,23 @@ public void rowsAreSnapshots() {
assertThat(rows.get(2).getString(1)).isEqualTo("v4");
}
+ @Test
+ public void pointReadWithDirectedReadOptions() {
+ try (ResultSet rs =
+ getClient(dialect.dialect)
+ .singleUse()
+ .read(
+ TABLE_NAME,
+ KeySet.singleKey(Key.of("k1")),
+ ALL_COLUMNS,
+ Options.directedRead(DIRECTED_READ_OPTIONS))) {
+ assertTrue(rs.next());
+ assertEquals("k1", rs.getString(0));
+ assertEquals("v1", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
@Test
public void invalidDatabase() {
RemoteSpannerHelper helper = env.getTestHelper();