diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index 67e9663747c..f158f62ea81 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -222,6 +222,105 @@
com/google/cloud/spanner/connection/Connection
com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[])
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+ com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map, boolean)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+
+
+ 7005
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)
+
+
+ 7006
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+ com.google.spanner.v1.ResultSet
+
+
+ 7006
+ com/google/cloud/spanner/spi/v1/GapicSpannerRpc
+ com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+
+
+ 7004
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+
+
+ 7005
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)
+
+
+ 7006
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)
+ com.google.spanner.v1.ResultSet
+
+
+ 7006
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)
+ com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall
+
+
7012
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index 7facd19c826..e19ace944a5 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -163,6 +163,11 @@ private SingleReadContext(Builder builder) {
this.bound = builder.bound;
}
+ @Override
+ protected boolean isRouteToLeader() {
+ return false;
+ }
+
@GuardedBy("lock")
@Override
void beforeReadOrQueryLocked() {
@@ -293,6 +298,11 @@ static Builder newBuilder() {
}
}
+ @Override
+ protected boolean isRouteToLeader() {
+ return false;
+ }
+
@Override
void beforeReadOrQuery() {
super.beforeReadOrQuery();
@@ -347,7 +357,8 @@ void initTransaction() {
.setSession(session.getName())
.setOptions(options)
.build();
- Transaction transaction = rpc.beginTransaction(request, session.getOptions());
+ Transaction transaction =
+ rpc.beginTransaction(request, session.getOptions(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
@@ -416,6 +427,10 @@ long getSeqNo() {
return seqNo.incrementAndGet();
}
+ protected boolean isRouteToLeader() {
+ return false;
+ }
+
@Override
public final ResultSet read(
String table, KeySet keys, Iterable columns, ReadOption... options) {
@@ -667,7 +682,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
- rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
+ rpc.executeQuery(
+ request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
@@ -798,7 +814,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
- rpc.read(builder.build(), stream.consumer(), session.getOptions());
+ rpc.read(
+ builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
index 976f6136db7..36991b18c3d 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
@@ -202,7 +202,7 @@ private ByteString initTransaction() {
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
- Transaction tx = rpc.beginTransaction(request, session.getOptions());
+ Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index df6163e93e9..2bef8e3ada4 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -275,7 +275,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
@Override
public void prepareReadWriteTransaction() {
setActive(null);
- readyTransactionId = beginTransaction();
+ readyTransactionId = beginTransaction(true);
}
@Override
@@ -296,9 +296,9 @@ public void close() {
}
}
- ByteString beginTransaction() {
+ ByteString beginTransaction(boolean routeToLeader) {
try {
- return beginTransactionAsync().get();
+ return beginTransactionAsync(routeToLeader).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
@@ -306,11 +306,11 @@ ByteString beginTransaction() {
}
}
- ApiFuture beginTransactionAsync() {
- return beginTransactionAsync(Options.fromTransactionOptions());
+ ApiFuture beginTransactionAsync(boolean routeToLeader) {
+ return beginTransactionAsync(Options.fromTransactionOptions(), routeToLeader);
}
- ApiFuture beginTransactionAsync(Options transactionOptions) {
+ ApiFuture beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
final SettableApiFuture res = SettableApiFuture.create();
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
final BeginTransactionRequest request =
@@ -319,7 +319,7 @@ ApiFuture beginTransactionAsync(Options transactionOptions) {
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.build();
final ApiFuture requestFuture =
- spanner.getRpc().beginTransactionAsync(request, options);
+ spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
requestFuture.addListener(
tracer.withSpan(
span,
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index d6243f59592..6663d3f8e68 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -132,6 +132,7 @@ public class SpannerOptions extends ServiceOptions {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
+ private final boolean leaderAwareRoutingEnabled;
/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
@@ -600,6 +601,7 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
+ leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
}
/**
@@ -700,6 +702,7 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
+ private boolean leaderAwareRoutingEnabled = false;
private Builder() {
// Manually set retry and polling settings that work.
@@ -1155,6 +1158,24 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}
+ /**
+ * Enable leader aware routing. Leader aware routing would route all requests in RW/PDML
+ * transactions to the leader region.
+ */
+ public Builder enableLeaderAwareRouting() {
+ this.leaderAwareRoutingEnabled = true;
+ return this;
+ }
+
+ /**
+ * Disable leader aware routing. Disabling leader aware routing would route all requests in
+ * RW/PDML transactions to any region.
+ */
+ public Builder disableLeaderAwareRouting() {
+ this.leaderAwareRoutingEnabled = false;
+ return this;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
@@ -1291,6 +1312,10 @@ public String getCompressorName() {
return compressorName;
}
+ public boolean isLeaderAwareRoutingEnabled() {
+ return leaderAwareRoutingEnabled;
+ }
+
/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index 3d3b34c4c39..ef937e993bd 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -198,6 +198,11 @@ private TransactionContextImpl(Builder builder) {
this.finishedAsyncOperations.set(null);
}
+ @Override
+ protected boolean isRouteToLeader() {
+ return true;
+ }
+
private void increaseAsyncOperations() {
synchronized (lock) {
if (runningAsyncOperations == 0) {
@@ -255,7 +260,7 @@ ApiFuture ensureTxnAsync() {
private void createTxnAsync(final SettableApiFuture res) {
span.addAnnotation("Creating Transaction");
- final ApiFuture fut = session.beginTransactionAsync(options);
+ final ApiFuture fut = session.beginTransactionAsync(options, isRouteToLeader());
fut.addListener(
() -> {
try {
@@ -717,7 +722,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
- rpc.executeQuery(builder.build(), session.getOptions());
+ rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
@@ -747,7 +752,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
- resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
+ resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index eb8633d0449..4d70a26866c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -252,6 +252,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D;
private static final ConcurrentMap ADMINISTRATIVE_REQUESTS_RATE_LIMITERS =
new ConcurrentHashMap<>();
+ private final boolean leaderAwareRoutingEnabled;
public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
@@ -302,6 +303,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();
+ this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled();
if (initializeStubs) {
// First check if SpannerOptions provides a TransportChannelProvider. Create one
@@ -1527,7 +1529,8 @@ public List batchCreateSessions(
requestBuilder.setSessionTemplate(sessionBuilder);
BatchCreateSessionsRequest request = requestBuilder.build();
GrpcCallContext context =
- newCallContext(options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod());
+ newCallContext(
+ options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod(), true);
return get(spannerStub.batchCreateSessionsCallable().futureCall(request, context))
.getSessionList();
}
@@ -1551,7 +1554,7 @@ public Session createSession(
requestBuilder.setSession(sessionBuilder);
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context =
- newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod());
+ newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod(), true);
return get(spannerStub.createSessionCallable().futureCall(request, context));
}
@@ -1571,9 +1574,13 @@ public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) {
+ ReadRequest request,
+ ResultStreamConsumer consumer,
+ @Nullable Map