From 2cd2fbdaace80118257ac8b10006483aa50bf57b Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Thu, 29 Dec 2022 14:54:40 -0800 Subject: [PATCH 1/8] feat: Add `x-goog-spanner-route-to-leader` header to Spanner RPC contexts for RW/PDML transactions. The header is added to support leader-aware-routing feature, which aims at reducing cross-regional latency for RW/PDML transactions in a multi-region instance. --- .../cloud/spanner/AbstractReadContext.java | 11 ++- .../spanner/PartitionedDmlTransaction.java | 2 +- .../com/google/cloud/spanner/SessionImpl.java | 10 +-- .../cloud/spanner/TransactionRunnerImpl.java | 7 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 79 ++++++++++++++----- .../spi/v1/SpannerMetadataProvider.java | 8 +- .../cloud/spanner/spi/v1/SpannerRpc.java | 20 +++-- .../cloud/spanner/BatchClientImplTest.java | 3 +- .../PartitionedDmlTransactionTest.java | 21 ++--- .../google/cloud/spanner/SessionImplTest.java | 18 +++-- .../google/cloud/spanner/SessionPoolTest.java | 11 ++- .../spanner/TransactionManagerImplTest.java | 17 ++-- .../spanner/TransactionRunnerImplTest.java | 16 ++-- .../spanner/spi/v1/GapicSpannerRpcTest.java | 37 +++++++++ .../spi/v1/SpannerMetadataProviderTest.java | 11 +++ 15 files changed, 198 insertions(+), 73 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 60324f8e83e..af937634506 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -161,6 +161,7 @@ static Builder newBuilder() { private SingleReadContext(Builder builder) { super(builder); this.bound = builder.bound; + this.routeToLeader = false; } @GuardedBy("lock") @@ -291,6 +292,7 @@ static Builder newBuilder() { this.timestamp = builder.timestamp; this.transactionId = builder.transactionId; } + this.routeToLeader = false; } @Override @@ -347,7 +349,8 @@ void initTransaction() { .setSession(session.getName()) .setOptions(options) .build(); - Transaction transaction = rpc.beginTransaction(request, session.getOptions()); + Transaction transaction = + rpc.beginTransaction(request, session.getOptions(), routeToLeader); if (!transaction.hasReadTimestamp()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); @@ -380,6 +383,7 @@ void initTransaction() { Span span; private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; + protected boolean routeToLeader = false; @GuardedBy("lock") private boolean isValid = true; @@ -664,7 +668,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken request.setTransaction(selector); } SpannerRpc.StreamingCall call = - rpc.executeQuery(request.build(), stream.consumer(), session.getOptions()); + rpc.executeQuery( + request.build(), stream.consumer(), session.getOptions(), routeToLeader); call.request(prefetchChunks); stream.setCall(call, request.getTransaction().hasBegin()); return stream; @@ -792,7 +797,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } builder.setRequestOptions(buildRequestOptions(readOptions)); SpannerRpc.StreamingCall call = - rpc.read(builder.build(), stream.consumer(), session.getOptions()); + rpc.read(builder.build(), stream.consumer(), session.getOptions(), routeToLeader); call.request(prefetchChunks); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 976f6136db7..36991b18c3d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -202,7 +202,7 @@ private ByteString initTransaction() { TransactionOptions.newBuilder() .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) .build(); - Transaction tx = rpc.beginTransaction(request, session.getOptions()); + Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); if (tx.getId().isEmpty()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e981f96eb33..5592d0dbfe7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -267,7 +267,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... @Override public void prepareReadWriteTransaction() { setActive(null); - readyTransactionId = beginTransaction(); + readyTransactionId = beginTransaction(true); } @Override @@ -288,9 +288,9 @@ public void close() { } } - ByteString beginTransaction() { + ByteString beginTransaction(boolean routeToLeader) { try { - return beginTransactionAsync().get(); + return beginTransactionAsync(routeToLeader).get(); } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()); } catch (InterruptedException e) { @@ -298,7 +298,7 @@ ByteString beginTransaction() { } } - ApiFuture beginTransactionAsync() { + ApiFuture beginTransactionAsync(boolean routeToLeader) { final SettableApiFuture res = SettableApiFuture.create(); final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan(); final BeginTransactionRequest request = @@ -309,7 +309,7 @@ ApiFuture beginTransactionAsync() { .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) .build(); final ApiFuture requestFuture = - spanner.getRpc().beginTransactionAsync(request, options); + spanner.getRpc().beginTransactionAsync(request, options, routeToLeader); requestFuture.addListener( tracer.withSpan( span, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 4557064b5e1..52227783450 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -196,6 +196,7 @@ private TransactionContextImpl(Builder builder) { this.trackTransactionStarter = builder.trackTransactionStarter; this.options = builder.options; this.finishedAsyncOperations.set(null); + this.routeToLeader = true; } private void increaseAsyncOperations() { @@ -255,7 +256,7 @@ ApiFuture ensureTxnAsync() { private void createTxnAsync(final SettableApiFuture res) { span.addAnnotation("Creating Transaction"); - final ApiFuture fut = session.beginTransactionAsync(); + final ApiFuture fut = session.beginTransactionAsync(routeToLeader); fut.addListener( () -> { try { @@ -719,7 +720,7 @@ private ResultSet internalExecuteUpdate( /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = - rpc.executeQuery(builder.build(), session.getOptions()); + rpc.executeQuery(builder.build(), session.getOptions(), routeToLeader); if (resultSet.getMetadata().hasTransaction()) { onTransactionMetadata( resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -749,7 +750,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o // Register the update as an async operation that must finish before the transaction may // commit. increaseAsyncOperations(); - resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions()); + resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), routeToLeader); } catch (Throwable t) { decreaseAsyncOperations(); throw t; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7f325665542..eca8b75bca8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -1592,7 +1592,8 @@ public List batchCreateSessions( requestBuilder.setSessionTemplate(sessionBuilder); BatchCreateSessionsRequest request = requestBuilder.build(); GrpcCallContext context = - newCallContext(options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod()); + newCallContext( + options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod(), true); return get(spannerStub.batchCreateSessionsCallable().futureCall(request, context)) .getSessionList(); } @@ -1616,7 +1617,7 @@ public Session createSession( requestBuilder.setSession(sessionBuilder); CreateSessionRequest request = requestBuilder.build(); GrpcCallContext context = - newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod()); + newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod(), true); return get(spannerStub.createSessionCallable().futureCall(request, context)); } @@ -1630,15 +1631,19 @@ public void deleteSession(String sessionName, @Nullable Map options) public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) { DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); GrpcCallContext context = - newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod()); + newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod(), false); return spannerStub.deleteSessionCallable().futureCall(request, context); } @Override public StreamingCall read( - ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { + ReadRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.streamingReadCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); @@ -1658,13 +1663,14 @@ public void cancel(String message) { } @Override - public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options) { - return get(executeQueryAsync(request, options)); + public ResultSet executeQuery( + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { + return get(executeQueryAsync(request, options, routeToLeader)); } @Override public ApiFuture executeQueryAsync( - ExecuteSqlRequest request, @Nullable Map options) { + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { GrpcCallContext context = newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod()); return spannerStub.executeSqlCallable().futureCall(request, context); @@ -1674,7 +1680,8 @@ public ApiFuture executeQueryAsync( public ResultSet executePartitionedDml( ExecuteSqlRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod(), true); return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context)); } @@ -1688,7 +1695,11 @@ public ServerStream executeStreamingPartitionedDml( ExecuteSqlRequest request, Map options, Duration timeout) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getExecuteStreamingSqlMethod(), + true); // Override any timeout settings that might have been set on the call context. context = context.withTimeout(timeout).withStreamWaitTimeout(timeout); return partitionedDmlStub.executeStreamingSqlCallable().call(request, context); @@ -1696,10 +1707,17 @@ public ServerStream executeStreamingPartitionedDml( @Override public StreamingCall executeQuery( - ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { + ExecuteSqlRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getExecuteStreamingSqlMethod(), + routeToLeader); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); @@ -1729,30 +1747,35 @@ public ApiFuture executeBatchDmlAsync( ExecuteBatchDmlRequest request, @Nullable Map options) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod()); + options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod(), true); return spannerStub.executeBatchDmlCallable().futureCall(request, context); } @Override public ApiFuture beginTransactionAsync( - BeginTransactionRequest request, @Nullable Map options) { + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getBeginTransactionMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getBeginTransactionMethod(), + routeToLeader); return spannerStub.beginTransactionCallable().futureCall(request, context); } @Override public Transaction beginTransaction( - BeginTransactionRequest request, @Nullable Map options) throws SpannerException { - return get(beginTransactionAsync(request, options)); + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) + throws SpannerException { + return get(beginTransactionAsync(request, options, routeToLeader)); } @Override public ApiFuture commitAsync( CommitRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod()); + newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true); return spannerStub.commitCallable().futureCall(request, context); } @@ -1765,7 +1788,8 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map rollbackAsync(RollbackRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getRollbackMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true); return spannerStub.rollbackCallable().futureCall(request, context); } @@ -1780,7 +1804,7 @@ public PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod()); + options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod(), true); return get(spannerStub.partitionQueryCallable().futureCall(request, context)); } @@ -1789,7 +1813,7 @@ public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod()); + options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod(), true); return get(spannerStub.partitionReadCallable().futureCall(request, context)); } @@ -1898,6 +1922,16 @@ GrpcCallContext newCallContext( String resource, ReqT request, MethodDescriptor method) { + return newCallContext(options, resource, request, method, false); + } + + @VisibleForTesting + GrpcCallContext newCallContext( + @Nullable Map options, + String resource, + ReqT request, + MethodDescriptor method, + boolean routeToLeader) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); @@ -1907,6 +1941,9 @@ GrpcCallContext newCallContext( context = context.withCallOptions(context.getCallOptions().withCompression(compressorName)); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); + if (routeToLeader) { + context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader()); + } if (callCredentialsProvider != null) { CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); if (callCredentials != null) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index 7f9a32765e0..ae52134f625 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -28,7 +28,7 @@ class SpannerMetadataProvider { private final Map, String> headers; private final String resourceHeaderKey; - + private static final String routeToLeaderHeaderKey = "x-goog-spanner-route-to-leader"; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), Pattern.compile("^(?projects/[^/]*/instances/[^/]*)(.*)?") @@ -66,6 +66,12 @@ Map> newExtraHeaders( .build(); } + Map> newRouteToLeaderHeader() { + return ImmutableMap.>builder() + .put(routeToLeaderHeaderKey, Collections.singletonList("true")) + .build(); + } + private Map, String> constructHeadersAsMetadata( Map headers) { ImmutableMap.Builder, String> headersAsMetadataBuilder = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 2f68b9c1df2..c61a80a9d6f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -320,12 +320,16 @@ ApiFuture asyncDeleteSession(String sessionName, @Nullable Map throws SpannerException; StreamingCall read( - ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options); + ReadRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader); - ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options); + ResultSet executeQuery( + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); ApiFuture executeQueryAsync( - ExecuteSqlRequest request, @Nullable Map options); + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map options); @@ -335,18 +339,22 @@ ServerStream executeStreamingPartitionedDml( ExecuteSqlRequest request, @Nullable Map options, Duration timeout); StreamingCall executeQuery( - ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); + ExecuteSqlRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader); ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map options); ApiFuture executeBatchDmlAsync( ExecuteBatchDmlRequest build, Map options); - Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map options) + Transaction beginTransaction( + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) throws SpannerException; ApiFuture beginTransactionAsync( - BeginTransactionRequest request, @Nullable Map options); + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader); CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) throws SpannerException; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index 2c29b87f86b..18ae8a07b35 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -91,7 +91,8 @@ public void testBatchReadOnlyTxnWithBound() throws Exception { com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); - when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture())).thenReturn(txnMetadata); + when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture(), eq(false))) + .thenReturn(txnMetadata); BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong()); assertThat(batchTxn.getBatchTransactionId().getSessionId()).isEqualTo(SESSION_NAME); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 61dcef2902a..3fbcd1a738f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -97,7 +98,7 @@ public void setup() { MockitoAnnotations.initMocks(this); when(session.getName()).thenReturn(sessionId); when(session.getOptions()).thenReturn(Collections.EMPTY_MAP); - when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap())) + when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); tx = new PartitionedDmlTransaction(session, rpc, ticker); @@ -117,7 +118,7 @@ public void testExecuteStreamingPartitionedUpdate() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -139,7 +140,7 @@ public void testExecuteStreamingPartitionedUpdateWithUpdateOptions() { Statement.of(sql), Duration.ofMinutes(10), Options.tag(tag)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class)); @@ -168,7 +169,7 @@ public void testExecuteStreamingPartitionedUpdateAborted() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc, times(2)) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -200,7 +201,7 @@ public void testExecuteStreamingPartitionedUpdateUnavailable() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -231,7 +232,7 @@ public void testExecuteStreamingPartitionedUpdateUnavailableAndThenDeadlineExcee SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -259,7 +260,7 @@ public void testExecuteStreamingPartitionedUpdateAbortedAndThenDeadlineExceeded( SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); - verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -297,7 +298,7 @@ public Long answer(InvocationOnMock invocation) { () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); // It should start a transaction exactly 10 times (10 ticks == 10 minutes). - verify(rpc, times(10)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(10)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); // The last transaction should timeout before it starts the actual statement execution, which // means that the execute method is only executed 9 times. verify(rpc, times(9)) @@ -335,7 +336,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -366,7 +367,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.INTERNAL, e.getErrorCode()); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 1174cbf4ebd..90e9a684d95 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -100,7 +101,9 @@ public void setUp() { Transaction txn = Transaction.newBuilder().setId(ByteString.copyFromUtf8("TEST")).build(); Mockito.when( rpc.beginTransactionAsync( - Mockito.any(BeginTransactionRequest.class), Mockito.any(Map.class))) + Mockito.any(BeginTransactionRequest.class), + Mockito.any(Map.class), + Mockito.anyBoolean())) .thenReturn(ApiFutures.immediateFuture(txn)); CommitResponse commitResponse = CommitResponse.newBuilder() @@ -350,7 +353,7 @@ public void singleUseContextClosesTransaction() { public void prepareClosesOldSingleUseContext() { ReadContext ctx = session.singleUse(TimestampBound.strong()); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))) + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) .thenReturn(Transaction.newBuilder().setId(ByteString.copyFromUtf8("t1")).build()); session.prepareReadWriteTransaction(); IllegalStateException e = @@ -414,7 +417,7 @@ public void request(int numMessages) {} private void mockRead(final PartialResultSet myResultSet) { final ArgumentCaptor consumer = ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); - Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options))) + Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), eq(false))) .then( invocation -> { consumer.getValue().onPartialResultSet(myResultSet); @@ -430,7 +433,8 @@ public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -448,7 +452,8 @@ public void multiUseReadOnlyTransactionReturnsMissingTimestamp() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -467,7 +472,8 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 8da8683cf0c..34f33bf5bea 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -762,9 +762,12 @@ public void testSessionNotFoundReadWriteTransaction() { when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap())) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(rpc.executeQuery( - any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class))) + any(ExecuteSqlRequest.class), + any(ResultStreamConsumer.class), + any(Map.class), + eq(true))) .thenReturn(closedStreamingCall); - when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class))) + when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class), eq(true))) .thenThrow(sessionNotFound); when(rpc.executeBatchDml(any(ExecuteBatchDmlRequest.class), any(Map.class))) .thenThrow(sessionNotFound); @@ -785,7 +788,7 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(closedSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(closedTransactionContext); - when(closedSession.beginTransactionAsync()).thenThrow(sessionNotFound); + when(closedSession.beginTransactionAsync(true)).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); closedTransactionRunner.setSpan(mock(Span.class)); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); @@ -798,7 +801,7 @@ public void testSessionNotFoundReadWriteTransaction() { final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); when(openSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(openTransactionContext); - when(openSession.beginTransactionAsync()) + when(openSession.beginTransactionAsync(true)) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); openTransactionRunner.setSpan(mock(Span.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 55df44a96df..f8462693fe6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -217,7 +218,8 @@ public void usesPreparedTransaction() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -241,7 +243,8 @@ public void usesPreparedTransaction() { mgr.commit(); } verify(rpc, times(1)) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); } } @@ -278,7 +281,8 @@ public void inlineBegin() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -286,7 +290,7 @@ public void inlineBegin() { .setId(ByteString.copyFromUtf8(UUID.randomUUID().toString())) .build())); final AtomicInteger transactionsStarted = new AtomicInteger(); - when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap())) + when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> { ResultSet.Builder builder = @@ -332,9 +336,10 @@ public void inlineBegin() { } // BeginTransaction should not be called, as we are inlining it with the ExecuteSql request. verify(rpc, Mockito.never()) - .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); // We should have 2 ExecuteSql requests. - verify(rpc, times(2)).executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap()); + verify(rpc, times(2)) + .executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true)); // But only 1 with a BeginTransaction. assertThat(transactionsStarted.get()).isEqualTo(1); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 04ac46d887b..df8245e6acb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -100,7 +101,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); firstRun = true; when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); - when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap())) + when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> { ResultSet.Builder builder = @@ -160,7 +161,8 @@ public void usesPreparedTransaction() { .setCreateTime( Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -180,7 +182,8 @@ public void usesPreparedTransaction() { DatabaseClient client = spanner.getDatabaseClient(db); client.readWriteTransaction().run(transaction -> null); verify(rpc, times(1)) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); } } @@ -294,9 +297,10 @@ public void prepareReadWriteTransaction() { return null; }); verify(rpc, Mockito.never()) - .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); verify(rpc, Mockito.never()) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); assertThat(usedInlinedBegin).isTrue(); } @@ -311,7 +315,7 @@ private long[] batchDmlException(int status) { .setRpc(rpc) .build(); when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(transaction); - when(session.beginTransactionAsync()) + when(session.beginTransactionAsync(true)) .thenReturn( ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 23ec9c682c8..d05439f27b5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -24,6 +24,7 @@ import static org.junit.Assume.assumeTrue; import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.HeaderProvider; import com.google.auth.oauth2.AccessToken; @@ -44,6 +45,7 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc.AdminRequestsLimitExceededRetryAlgorithm; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ListValue; import com.google.rpc.ErrorInfo; import com.google.spanner.v1.ExecuteSqlRequest; @@ -378,6 +380,41 @@ public void testNewCallContextWithNullRequestAndNullMethod() { rpc.shutdown(); } + @Test + public void testNewCallContextWithRouteToLeaderHeader() { + SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + true); + assertThat(callContext).isNotNull(); + assertEquals( + callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader"), + ImmutableList.of("true")); + ; + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithoutRouteToLeaderHeader() { + SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + false); + assertThat(callContext).isNotNull(); + assertThat(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")).isNull(); + rpc.shutdown(); + } + @Test public void testAdminRequestsLimitExceededRetryAlgorithm() { AdminRequestsLimitExceededRetryAlgorithm alg = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java index 07e13626c18..7b9501580ac 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java @@ -82,6 +82,17 @@ public void testNewExtraHeaders() { ImmutableMap.>of("header1", ImmutableList.of("value1"))); } + @Test + public void testNewRouteToLeaderHeader() { + SpannerMetadataProvider metadataProvider = + SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); + Map> extraHeaders = metadataProvider.newRouteToLeaderHeader(); + assertThat(extraHeaders) + .containsExactlyEntriesIn( + ImmutableMap.>of( + "x-goog-spanner-route-to-leader", ImmutableList.of("true"))); + } + private String getResourceHeaderValue( SpannerMetadataProvider headerProvider, String resourceTokenTemplate) { Metadata metadata = headerProvider.newMetadata(resourceTokenTemplate, "projects/p"); From b57466ab20e09f7259ebf324be7ece0b9593b526 Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Tue, 24 Jan 2023 12:43:25 -0800 Subject: [PATCH 2/8] feat: Add knob in SpannerOptions to allow users to opt out leader aware routing feature --- .../clirr-ignored-differences.xml | 99 +++++++++++++++++++ .../google/cloud/spanner/SpannerOptions.java | 16 +++ .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 11 ++- .../cloud/spanner/SpannerOptionsTest.java | 20 ++++ .../spanner/spi/v1/GapicSpannerRpcTest.java | 20 ++++ 5 files changed, 164 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 637ca06280a..ca01e16f87e 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -222,4 +222,103 @@ com/google/cloud/spanner/connection/Connection com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[]) + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map, boolean) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + + + 7005 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7006 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.spanner.v1.ResultSet + + + 7006 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + + + 7005 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7006 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.spanner.v1.ResultSet + + + 7006 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall + + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index d6243f59592..737da425064 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -132,6 +132,7 @@ public class SpannerOptions extends ServiceOptions { private final CallCredentialsProvider callCredentialsProvider; private final CloseableExecutorProvider asyncExecutorProvider; private final String compressorName; + private final boolean leaderAwareRoutingEnabled; /** * Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to @@ -600,6 +601,7 @@ private SpannerOptions(Builder builder) { callCredentialsProvider = builder.callCredentialsProvider; asyncExecutorProvider = builder.asyncExecutorProvider; compressorName = builder.compressorName; + leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled; } /** @@ -700,6 +702,7 @@ public static class Builder private CloseableExecutorProvider asyncExecutorProvider; private String compressorName; private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); + private boolean leaderAwareRoutingEnabled = true; private Builder() { // Manually set retry and polling settings that work. @@ -1155,6 +1158,15 @@ public Builder setEmulatorHost(String emulatorHost) { return this; } + /** + * Enable or disable leader aware routing. Leader aware routing would route all requests in + * RW/PDML transactions to the leader region. + */ + public Builder setLeaderAwareRouting(boolean leaderAwareRoutingEnabled) { + this.leaderAwareRoutingEnabled = leaderAwareRoutingEnabled; + return this; + } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { @@ -1291,6 +1303,10 @@ public String getCompressorName() { return compressorName; } + public boolean isLeaderAwareRoutingEnabled() { + return leaderAwareRoutingEnabled; + } + /** Returns the default query options to use for the specific database. */ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) { // Use the specific query options for the database if any have been specified. These have diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index eca8b75bca8..5f4a7ac291d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -304,6 +304,7 @@ private void awaitTermination() throws InterruptedException { private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D; private static final ConcurrentMap ADMINISTRATIVE_REQUESTS_RATE_LIMITERS = new ConcurrentHashMap<>(); + private final boolean leaderAwareRoutingEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -354,6 +355,7 @@ public GapicSpannerRpc(final SpannerOptions options) { internalHeaderProviderBuilder.getResourceHeaderKey()); this.callCredentialsProvider = options.getCallCredentialsProvider(); this.compressorName = options.getCompressorName(); + this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); if (initializeStubs) { // Create a managed executor provider. @@ -1672,7 +1674,12 @@ public ResultSet executeQuery( public ApiFuture executeQueryAsync( ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod()); + newCallContext( + options, + request.getSession(), + request, + SpannerGrpc.getExecuteSqlMethod(), + routeToLeader); return spannerStub.executeSqlCallable().futureCall(request, context); } @@ -1941,7 +1948,7 @@ GrpcCallContext newCallContext( context = context.withCallOptions(context.getCallOptions().withCompression(compressorName)); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); - if (routeToLeader) { + if (routeToLeader && leaderAwareRoutingEnabled) { context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader()); } if (callCredentialsProvider != null) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 55a15809a48..8005c824176 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -675,6 +675,26 @@ public void testCompressorName() { IllegalArgumentException.class, () -> SpannerOptions.newBuilder().setCompressorName("foo")); } + @Test + public void testLeaderAwareRoutingEnablement() { + assertThat(SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()) + .isEqualTo(true); + assertThat( + SpannerOptions.newBuilder() + .setProjectId("p") + .setLeaderAwareRouting(true) + .build() + .isLeaderAwareRoutingEnabled()) + .isEqualTo(true); + assertThat( + SpannerOptions.newBuilder() + .setProjectId("p") + .setLeaderAwareRouting(false) + .build() + .isLeaderAwareRoutingEnabled()) + .isEqualTo(false); + } + @Test public void testSpannerCallContextTimeoutConfigurator_NullValues() { SpannerCallContextTimeoutConfigurator configurator = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index d05439f27b5..fb884f263b3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -415,6 +415,26 @@ public void testNewCallContextWithoutRouteToLeaderHeader() { rpc.shutdown(); } + @Test + public void testNewCallContextWithRouteToLeaderHeaderAndLarDisabled() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .setLeaderAwareRouting(false) + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + true); + assertThat(callContext).isNotNull(); + assertThat(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")).isNull(); + rpc.shutdown(); + } + @Test public void testAdminRequestsLimitExceededRetryAlgorithm() { AdminRequestsLimitExceededRetryAlgorithm alg = From cde93a377126b4e9bd25a4472bdf0168ee36abf9 Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Tue, 24 Jan 2023 13:52:33 -0800 Subject: [PATCH 3/8] fix: fix broken tests due to the merge --- .../google/cloud/spanner/PartitionedDmlTransactionTest.java | 2 +- .../test/java/com/google/cloud/spanner/SessionPoolTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index c678a8eff48..93e0e3eb3d0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -375,7 +375,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index c807c929ae4..ab98e68f8bf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -788,7 +788,7 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(closedSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(closedTransactionContext); - when(closedSession.beginTransactionAsync(any(), true)).thenThrow(sessionNotFound); + when(closedSession.beginTransactionAsync(any(), eq(true))).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); closedTransactionRunner.setSpan(mock(Span.class)); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); @@ -801,7 +801,7 @@ public void testSessionNotFoundReadWriteTransaction() { final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); when(openSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(openTransactionContext); - when(openSession.beginTransactionAsync(any(), true)) + when(openSession.beginTransactionAsync(any(), eq(true))) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); openTransactionRunner.setSpan(mock(Span.class)); From 606bbdab12893eb21ad96e7d417354d47c5432a2 Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Wed, 25 Jan 2023 14:16:06 -0800 Subject: [PATCH 4/8] fix: resolve comments. --- .../google/cloud/spanner/SpannerOptions.java | 17 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 2 +- .../cloud/spanner/spi/v1/SpannerRpc.java | 48 +++ .../cloud/spanner/SpannerOptionsTest.java | 327 ++++++++---------- .../spanner/spi/v1/GapicSpannerRpcTest.java | 78 +++-- .../spi/v1/SpannerMetadataProviderTest.java | 17 +- 6 files changed, 261 insertions(+), 228 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 737da425064..e7dd1f57b04 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -1159,11 +1159,20 @@ public Builder setEmulatorHost(String emulatorHost) { } /** - * Enable or disable leader aware routing. Leader aware routing would route all requests in - * RW/PDML transactions to the leader region. + * Enable leader aware routing. Leader aware routing would route all requests in RW/PDML + * transactions to the leader region. */ - public Builder setLeaderAwareRouting(boolean leaderAwareRoutingEnabled) { - this.leaderAwareRoutingEnabled = leaderAwareRoutingEnabled; + public Builder enableLeaderAwareRouting() { + this.leaderAwareRoutingEnabled = true; + return this; + } + + /** + * Enable leader aware routing. Leader aware routing would route all requests in RW/PDML + * transactions to the leader region. + */ + public Builder disableLeaderAwareRouting() { + this.leaderAwareRoutingEnabled = false; return this; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 5f4a7ac291d..45e78837a2e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -1633,7 +1633,7 @@ public void deleteSession(String sessionName, @Nullable Map options) public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) { DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); GrpcCallContext context = - newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod(), false); + newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod()); return spannerStub.deleteSessionCallable().futureCall(request, context); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index c61a80a9d6f..552c53e85ee 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -319,15 +319,39 @@ Session createSession( ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) throws SpannerException; + /** + * Performs a streaming read. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options, boolean routeToLeader); + /** + * Executes a query. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ ResultSet executeQuery( ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); + /** + * Executes a query asynchronously. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ ApiFuture executeQueryAsync( ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); @@ -338,6 +362,14 @@ ApiFuture executeQueryAsync( ServerStream executeStreamingPartitionedDml( ExecuteSqlRequest request, @Nullable Map options, Duration timeout); + /** + * Executes a query with streaming result. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @@ -349,10 +381,26 @@ StreamingCall executeQuery( ApiFuture executeBatchDmlAsync( ExecuteBatchDmlRequest build, Map options); + /** + * Begins a transaction. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ Transaction beginTransaction( BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) throws SpannerException; + /** + * Begins a transaction asynchronously. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ ApiFuture beginTransactionAsync( BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 8005c824176..237558d6677 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -20,9 +20,13 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import com.google.api.gax.grpc.GrpcCallContext; @@ -82,7 +86,7 @@ public void defaultBuilder() { assertThat(options.getHost()).isEqualTo("http://" + System.getenv("SPANNER_EMULATOR_HOST")); } assertThat(options.getPrefetchChunks()).isEqualTo(4); - assertThat(options.getSessionLabels()).isNull(); + assertNull(options.getSessionLabels()); } @Test @@ -453,7 +457,7 @@ public void testInvalidTransport() { () -> SpannerOptions.newBuilder() .setTransportOptions(Mockito.mock(TransportOptions.class))); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -463,7 +467,7 @@ public void testInvalidSessionLabels() { NullPointerException e = assertThrows( NullPointerException.class, () -> SpannerOptions.newBuilder().setSessionLabels(labels)); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -471,7 +475,7 @@ public void testNullSessionLabels() { NullPointerException e = assertThrows( NullPointerException.class, () -> SpannerOptions.newBuilder().setSessionLabels(null)); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -664,35 +668,31 @@ public void testCompressorName() { .build() .getCompressorName()) .isEqualTo("identity"); - assertThat( - SpannerOptions.newBuilder() - .setProjectId("p") - .setCompressorName(null) - .build() - .getCompressorName()) - .isNull(); + assertNull( + SpannerOptions.newBuilder() + .setProjectId("p") + .setCompressorName(null) + .build() + .getCompressorName()); assertThrows( IllegalArgumentException.class, () -> SpannerOptions.newBuilder().setCompressorName("foo")); } @Test public void testLeaderAwareRoutingEnablement() { - assertThat(SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()) - .isEqualTo(true); - assertThat( - SpannerOptions.newBuilder() - .setProjectId("p") - .setLeaderAwareRouting(true) - .build() - .isLeaderAwareRoutingEnabled()) - .isEqualTo(true); - assertThat( - SpannerOptions.newBuilder() - .setProjectId("p") - .setLeaderAwareRouting(false) - .build() - .isLeaderAwareRoutingEnabled()) - .isEqualTo(false); + assertTrue(SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()); + assertTrue( + SpannerOptions.newBuilder() + .setProjectId("p") + .enableLeaderAwareRouting() + .build() + .isLeaderAwareRoutingEnabled()); + assertFalse( + SpannerOptions.newBuilder() + .setProjectId("p") + .disableLeaderAwareRouting() + .build() + .isLeaderAwareRoutingEnabled()); } @Test @@ -701,103 +701,85 @@ public void testSpannerCallContextTimeoutConfigurator_NullValues() { SpannerCallContextTimeoutConfigurator.create(); ApiCallContext inputCallContext = GrpcCallContext.createDefault(); - assertThat( - configurator.configure( - inputCallContext, - BatchCreateSessionsRequest.getDefaultInstance(), - SpannerGrpc.getBatchCreateSessionsMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CreateSessionRequest.getDefaultInstance(), - SpannerGrpc.getCreateSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ListSessionsRequest.getDefaultInstance(), - SpannerGrpc.getListSessionsMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - BeginTransactionRequest.getDefaultInstance(), - SpannerGrpc.getBeginTransactionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CommitRequest.getDefaultInstance(), - SpannerGrpc.getCommitMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - RollbackRequest.getDefaultInstance(), - SpannerGrpc.getRollbackMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteSqlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteStreamingSqlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ExecuteBatchDmlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteBatchDmlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ReadRequest.getDefaultInstance(), - SpannerGrpc.getStreamingReadMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - PartitionQueryRequest.getDefaultInstance(), - SpannerGrpc.getPartitionQueryMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - PartitionReadRequest.getDefaultInstance(), - SpannerGrpc.getPartitionReadMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + BatchCreateSessionsRequest.getDefaultInstance(), + SpannerGrpc.getBatchCreateSessionsMethod())); + assertNull( + configurator.configure( + inputCallContext, + CreateSessionRequest.getDefaultInstance(), + SpannerGrpc.getCreateSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + ListSessionsRequest.getDefaultInstance(), + SpannerGrpc.getListSessionsMethod())); + + assertNull( + configurator.configure( + inputCallContext, + BeginTransactionRequest.getDefaultInstance(), + SpannerGrpc.getBeginTransactionMethod())); + assertNull( + configurator.configure( + inputCallContext, CommitRequest.getDefaultInstance(), SpannerGrpc.getCommitMethod())); + assertNull( + configurator.configure( + inputCallContext, + RollbackRequest.getDefaultInstance(), + SpannerGrpc.getRollbackMethod())); + + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod())); + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteStreamingSqlMethod())); + assertNull( + configurator.configure( + inputCallContext, + ExecuteBatchDmlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteBatchDmlMethod())); + assertNull( + configurator.configure( + inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())); + assertNull( + configurator.configure( + inputCallContext, + ReadRequest.getDefaultInstance(), + SpannerGrpc.getStreamingReadMethod())); + + assertNull( + configurator.configure( + inputCallContext, + PartitionQueryRequest.getDefaultInstance(), + SpannerGrpc.getPartitionQueryMethod())); + assertNull( + configurator.configure( + inputCallContext, + PartitionReadRequest.getDefaultInstance(), + SpannerGrpc.getPartitionReadMethod())); } @Test @@ -815,49 +797,42 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { ApiCallContext inputCallContext = GrpcCallContext.createDefault(); - assertThat( - configurator.configure( - inputCallContext, - BatchCreateSessionsRequest.getDefaultInstance(), - SpannerGrpc.getBatchCreateSessionsMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CreateSessionRequest.getDefaultInstance(), - SpannerGrpc.getCreateSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ListSessionsRequest.getDefaultInstance(), - SpannerGrpc.getListSessionsMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - BeginTransactionRequest.getDefaultInstance(), - SpannerGrpc.getBeginTransactionMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + BatchCreateSessionsRequest.getDefaultInstance(), + SpannerGrpc.getBatchCreateSessionsMethod())); + assertNull( + configurator.configure( + inputCallContext, + CreateSessionRequest.getDefaultInstance(), + SpannerGrpc.getCreateSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + ListSessionsRequest.getDefaultInstance(), + SpannerGrpc.getListSessionsMethod())); + + assertNull( + configurator.configure( + inputCallContext, + BeginTransactionRequest.getDefaultInstance(), + SpannerGrpc.getBeginTransactionMethod())); assertThat( configurator .configure( @@ -875,12 +850,11 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { .getTimeout()) .isEqualTo(Duration.ofSeconds(8L)); - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteSqlMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod())); assertThat( configurator .configure( @@ -897,10 +871,9 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { SpannerGrpc.getExecuteBatchDmlMethod()) .getTimeout()) .isEqualTo(Duration.ofSeconds(1L)); - assertThat( - configurator.configure( - inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())); assertThat( configurator .configure( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index fb884f263b3..d1d416b7f7a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assume.assumeTrue; @@ -26,6 +27,7 @@ import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; @@ -209,15 +211,14 @@ public void testCallCredentialsProviderPreferenceAboveCredentials() { GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); // GoogleAuthLibraryCallCredentials doesn't implement equals, so we can only check for the // existence. - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNotNull(); + assertNotNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -230,15 +231,14 @@ public void testCallCredentialsProviderReturnsNull() { .setCallCredentialsProvider(() -> null) .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNull(); + assertNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -250,15 +250,14 @@ public void testNoCallCredentials() { .setCredentials(STATIC_CREDENTIALS) .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNull(); + assertNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -376,7 +375,7 @@ public ApiCallContext configure( public void testNewCallContextWithNullRequestAndNullMethod() { SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat(rpc.newCallContext(optionsMap, "/some/resource", null, null)).isNotNull(); + assertNotNull(rpc.newCallContext(optionsMap, "/some/resource", null, null)); rpc.shutdown(); } @@ -391,17 +390,20 @@ public void testNewCallContextWithRouteToLeaderHeader() { ExecuteSqlRequest.getDefaultInstance(), SpannerGrpc.getExecuteSqlMethod(), true); - assertThat(callContext).isNotNull(); + assertNotNull(callContext); + assertEquals( + ImmutableList.of("true"), + callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); assertEquals( - callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader"), - ImmutableList.of("true")); - ; + ImmutableList.of("projects/some-project"), + callContext.getExtraHeaders().get(ApiClientHeaderProvider.getDefaultResourceHeaderKey())); rpc.shutdown(); } @Test public void testNewCallContextWithoutRouteToLeaderHeader() { - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); + SpannerOptions options = + SpannerOptions.newBuilder().enableLeaderAwareRouting().setProjectId("some-project").build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); GrpcCallContext callContext = rpc.newCallContext( @@ -410,8 +412,8 @@ public void testNewCallContextWithoutRouteToLeaderHeader() { ExecuteSqlRequest.getDefaultInstance(), SpannerGrpc.getExecuteSqlMethod(), false); - assertThat(callContext).isNotNull(); - assertThat(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")).isNull(); + assertNotNull(callContext); + assertNull(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); rpc.shutdown(); } @@ -420,7 +422,7 @@ public void testNewCallContextWithRouteToLeaderHeaderAndLarDisabled() { SpannerOptions options = SpannerOptions.newBuilder() .setProjectId("some-project") - .setLeaderAwareRouting(false) + .disableLeaderAwareRouting() .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); GrpcCallContext callContext = @@ -430,8 +432,8 @@ public void testNewCallContextWithRouteToLeaderHeaderAndLarDisabled() { ExecuteSqlRequest.getDefaultInstance(), SpannerGrpc.getExecuteSqlMethod(), true); - assertThat(callContext).isNotNull(); - assertThat(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")).isNull(); + assertNotNull(callContext); + assertNull(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); rpc.shutdown(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java index 7b9501580ac..cc43e2dc334 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java @@ -15,11 +15,12 @@ */ package com.google.cloud.spanner.spi.v1; -import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.grpc.Metadata; import io.grpc.Metadata.Key; import java.util.List; @@ -77,9 +78,9 @@ public void testNewExtraHeaders() { SpannerMetadataProvider metadataProvider = SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); Map> extraHeaders = metadataProvider.newExtraHeaders(null, "value1"); - assertThat(extraHeaders) - .containsExactlyEntriesIn( - ImmutableMap.>of("header1", ImmutableList.of("value1"))); + Map> expectedHeaders = + ImmutableMap.>of("header1", ImmutableList.of("value1")); + assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); } @Test @@ -87,10 +88,10 @@ public void testNewRouteToLeaderHeader() { SpannerMetadataProvider metadataProvider = SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); Map> extraHeaders = metadataProvider.newRouteToLeaderHeader(); - assertThat(extraHeaders) - .containsExactlyEntriesIn( - ImmutableMap.>of( - "x-goog-spanner-route-to-leader", ImmutableList.of("true"))); + Map> expectedHeaders = + ImmutableMap.>of( + "x-goog-spanner-route-to-leader", ImmutableList.of("true")); + assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); } private String getResourceHeaderValue( From 1bbd24dc53a2612aaf203947b04a260654e23634 Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Wed, 1 Feb 2023 15:27:30 -0800 Subject: [PATCH 5/8] fix: resolve comments and add new tests to verify that the route-to-leader header exists for RW transactions and does not exist for RO transactions or when the leader aware routing feature is disabled. --- .../cloud/spanner/AbstractReadContext.java | 24 +++++-- .../cloud/spanner/TransactionRunnerImpl.java | 12 ++-- .../spi/v1/SpannerMetadataProvider.java | 9 +-- .../spanner/spi/v1/GapicSpannerRpcTest.java | 64 +++++++++++++++++++ 4 files changed, 95 insertions(+), 14 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index af937634506..149c6de4c6a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -161,7 +161,11 @@ static Builder newBuilder() { private SingleReadContext(Builder builder) { super(builder); this.bound = builder.bound; - this.routeToLeader = false; + } + + @Override + protected boolean isRouteToLeader() { + return false; } @GuardedBy("lock") @@ -292,7 +296,11 @@ static Builder newBuilder() { this.timestamp = builder.timestamp; this.transactionId = builder.transactionId; } - this.routeToLeader = false; + } + + @Override + protected boolean isRouteToLeader() { + return false; } @Override @@ -350,7 +358,7 @@ void initTransaction() { .setOptions(options) .build(); Transaction transaction = - rpc.beginTransaction(request, session.getOptions(), routeToLeader); + rpc.beginTransaction(request, session.getOptions(), isRouteToLeader()); if (!transaction.hasReadTimestamp()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); @@ -383,7 +391,6 @@ void initTransaction() { Span span; private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; - protected boolean routeToLeader = false; @GuardedBy("lock") private boolean isValid = true; @@ -420,6 +427,10 @@ long getSeqNo() { return seqNo.incrementAndGet(); } + protected boolean isRouteToLeader() { + return false; + } + @Override public final ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { @@ -669,7 +680,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } SpannerRpc.StreamingCall call = rpc.executeQuery( - request.build(), stream.consumer(), session.getOptions(), routeToLeader); + request.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); call.request(prefetchChunks); stream.setCall(call, request.getTransaction().hasBegin()); return stream; @@ -797,7 +808,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } builder.setRequestOptions(buildRequestOptions(readOptions)); SpannerRpc.StreamingCall call = - rpc.read(builder.build(), stream.consumer(), session.getOptions(), routeToLeader); + rpc.read( + builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); call.request(prefetchChunks); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 53b094412f9..ef937e993bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -196,7 +196,11 @@ private TransactionContextImpl(Builder builder) { this.trackTransactionStarter = builder.trackTransactionStarter; this.options = builder.options; this.finishedAsyncOperations.set(null); - this.routeToLeader = true; + } + + @Override + protected boolean isRouteToLeader() { + return true; } private void increaseAsyncOperations() { @@ -256,7 +260,7 @@ ApiFuture ensureTxnAsync() { private void createTxnAsync(final SettableApiFuture res) { span.addAnnotation("Creating Transaction"); - final ApiFuture fut = session.beginTransactionAsync(options, routeToLeader); + final ApiFuture fut = session.beginTransactionAsync(options, isRouteToLeader()); fut.addListener( () -> { try { @@ -718,7 +722,7 @@ private ResultSet internalExecuteUpdate( /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = - rpc.executeQuery(builder.build(), session.getOptions(), routeToLeader); + rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader()); if (resultSet.getMetadata().hasTransaction()) { onTransactionMetadata( resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -748,7 +752,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o // Register the update as an async operation that must finish before the transaction may // commit. increaseAsyncOperations(); - resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), routeToLeader); + resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader()); } catch (Throwable t) { decreaseAsyncOperations(); throw t; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index ae52134f625..77406a5399b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -28,12 +28,15 @@ class SpannerMetadataProvider { private final Map, String> headers; private final String resourceHeaderKey; - private static final String routeToLeaderHeaderKey = "x-goog-spanner-route-to-leader"; + private static final String ROUTE_TO_LEADER_HEADER_KEY = "x-goog-spanner-route-to-leader"; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), Pattern.compile("^(?projects/[^/]*/instances/[^/]*)(.*)?") }; + private static final Map> ROUTE_TO_LEADER_HEADER_MAP = + ImmutableMap.of(ROUTE_TO_LEADER_HEADER_KEY, Collections.singletonList("true")); + private SpannerMetadataProvider(Map headers, String resourceHeaderKey) { this.resourceHeaderKey = resourceHeaderKey; this.headers = constructHeadersAsMetadata(headers); @@ -67,9 +70,7 @@ Map> newExtraHeaders( } Map> newRouteToLeaderHeader() { - return ImmutableMap.>builder() - .put(routeToLeaderHeaderKey, Collections.singletonList("true")) - .build(); + return ROUTE_TO_LEADER_HEADER_MAP; } private Map, String> constructHeadersAsMetadata( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index d1d416b7f7a..1429091b128 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -19,9 +19,11 @@ import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import com.google.api.gax.core.GaxProperties; @@ -45,6 +47,7 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc.AdminRequestsLimitExceededRetryAlgorithm; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.collect.ImmutableList; @@ -140,6 +143,7 @@ public class GapicSpannerRpcTest { private static Metadata lastSeenHeaders; private static String defaultUserAgent; private static Spanner spanner; + private static boolean isRouteToLeader; @Parameter public Dialect dialect; @@ -177,6 +181,17 @@ public ServerCall.Listener interceptCall( String auth = headers.get(Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)); assertThat(auth).isEqualTo("Bearer " + VARIABLE_OAUTH_TOKEN); + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getExecuteStreamingSqlMethod()) + || call.getMethodDescriptor().equals(SpannerGrpc.getExecuteSqlMethod())) { + String routeToLeaderHeader = + headers.get( + Key.of( + "x-goog-spanner-route-to-leader", + Metadata.ASCII_STRING_MARSHALLER)); + isRouteToLeader = + (routeToLeaderHeader != null && routeToLeaderHeader.equals("true")); + } return Contexts.interceptCall(Context.current(), call, headers, next); } }) @@ -198,6 +213,7 @@ public void reset() throws InterruptedException { server.shutdown(); server.awaitTermination(); } + isRouteToLeader = false; } @Test @@ -508,6 +524,54 @@ public void testCustomUserAgent() { } } + @Test + public void testRouteToLeaderHeaderForReadOnly() { + final SpannerOptions options = createSpannerOptions(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + try (final ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + assertFalse(isRouteToLeader); + } + } + + @Test + public void testRouteToLeaderHeaderForReadWrite() { + final SpannerOptions options = createSpannerOptions(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertTrue(isRouteToLeader); + } + + @Test + public void testRouteToLeaderHeaderWithLeaderAwareRoutingDisabled() { + final SpannerOptions options = + createSpannerOptions().toBuilder().disableLeaderAwareRouting().build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertFalse(isRouteToLeader); + } + private SpannerOptions createSpannerOptions() { String endpoint = address.getHostString() + ":" + server.getPort(); return SpannerOptions.newBuilder() From f9152cd25c3421038041ba303efc4934b45960a5 Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Wed, 1 Feb 2023 17:18:10 -0800 Subject: [PATCH 6/8] fix: Update comments for SpannerOptions.disableLeaderAwareRouting --- .../main/java/com/google/cloud/spanner/SpannerOptions.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index e7dd1f57b04..501c467ca08 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -1168,8 +1168,8 @@ public Builder enableLeaderAwareRouting() { } /** - * Enable leader aware routing. Leader aware routing would route all requests in RW/PDML - * transactions to the leader region. + * Disable leader aware routing. Disabling leader aware routing would route all requests in + * RW/PDML transactions to any region. */ public Builder disableLeaderAwareRouting() { this.leaderAwareRoutingEnabled = false; From 90d455cd8624b016efa5899bd5cd5f551dd7886f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 3 Feb 2023 03:19:43 +0000 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1086d0b69af..d5db9eb8177 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.4.0') +implementation platform('com.google.cloud:libraries-bom:26.6.0') implementation 'com.google.cloud:google-cloud-spanner' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.35.1' +implementation 'com.google.cloud:google-cloud-spanner:6.35.2' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.35.1" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.35.2" ``` ## Authentication From 9c212af50b1490201dc8844e33604445e3168f9e Mon Sep 17 00:00:00 2001 From: Yifan Zhou Date: Thu, 20 Apr 2023 23:03:19 -0700 Subject: [PATCH 8/8] feat: Disable leader aware routing by default for public preview --- .../java/com/google/cloud/spanner/SpannerOptions.java | 2 +- .../com/google/cloud/spanner/SpannerOptionsTest.java | 3 ++- .../google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java | 9 ++++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 501c467ca08..6663d3f8e68 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -702,7 +702,7 @@ public static class Builder private CloseableExecutorProvider asyncExecutorProvider; private String compressorName; private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); - private boolean leaderAwareRoutingEnabled = true; + private boolean leaderAwareRoutingEnabled = false; private Builder() { // Manually set retry and polling settings that work. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 237558d6677..2a4d351315f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -680,7 +680,8 @@ public void testCompressorName() { @Test public void testLeaderAwareRoutingEnablement() { - assertTrue(SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()); + assertFalse( + SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()); assertTrue( SpannerOptions.newBuilder() .setProjectId("p") diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 1429091b128..d32dc518714 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -397,7 +397,8 @@ public void testNewCallContextWithNullRequestAndNullMethod() { @Test public void testNewCallContextWithRouteToLeaderHeader() { - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); + SpannerOptions options = + SpannerOptions.newBuilder().setProjectId("some-project").enableLeaderAwareRouting().build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); GrpcCallContext callContext = rpc.newCallContext( @@ -526,7 +527,8 @@ public void testCustomUserAgent() { @Test public void testRouteToLeaderHeaderForReadOnly() { - final SpannerOptions options = createSpannerOptions(); + final SpannerOptions options = + createSpannerOptions().toBuilder().enableLeaderAwareRouting().build(); try (Spanner spanner = options.getService()) { final DatabaseClient databaseClient = spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); @@ -541,7 +543,8 @@ public void testRouteToLeaderHeaderForReadOnly() { @Test public void testRouteToLeaderHeaderForReadWrite() { - final SpannerOptions options = createSpannerOptions(); + final SpannerOptions options = + createSpannerOptions().toBuilder().enableLeaderAwareRouting().build(); try (Spanner spanner = options.getService()) { final DatabaseClient databaseClient = spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));