Skip to content

Commit

Permalink
feat: add support for Directed Read options
Browse files Browse the repository at this point in the history
  • Loading branch information
rajatbhatta committed Apr 10, 2023
1 parent 6927e06 commit f284448
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,10 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
if (options.hasDirectedReadOptions()) {
SpannerUtil.verifyDirectedReadOptions(options.directedReadOptions());
builder.setDirectedReadOptions(options.directedReadOptions());
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
Expand Down Expand Up @@ -667,7 +671,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions(), true);
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -779,6 +783,10 @@ ResultSet readInternalWithOptions(
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
if (readOptions.hasDirectedReadOptions()) {
SpannerUtil.verifyDirectedReadOptions(readOptions.directedReadOptions());
builder.setDirectedReadOptions(readOptions.directedReadOptions());
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand All @@ -798,7 +806,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
rpc.read(builder.build(), stream.consumer(), session.getOptions(), true);
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.BetaApi;
import com.google.common.base.Preconditions;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;
Expand Down Expand Up @@ -328,6 +329,19 @@ void appendToOptions(Options options) {
}
}

static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
private final DirectedReadOptions directedReadOptions;

DirectedReadOption(DirectedReadOptions directedReadOptions) {
this.directedReadOptions = directedReadOptions;
}

@Override
void appendToOptions(Options options) {
options.directedReadOptions = directedReadOptions;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -341,6 +355,7 @@ void appendToOptions(Options options) {
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -441,6 +456,14 @@ Boolean dataBoostEnabled() {
return dataBoostEnabled;
}

boolean hasDirectedReadOptions() {
return directedReadOptions != null;
}

DirectedReadOptions directedReadOptions() {
return directedReadOptions;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -480,6 +503,9 @@ public String toString() {
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
if (directedReadOptions != null) {
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -515,7 +541,8 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}

@Override
Expand Down Expand Up @@ -560,6 +587,9 @@ public int hashCode() {
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
if (directedReadOptions != null) {
result = 31 * result + directedReadOptions.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final DirectedReadOptions directedReadOptions;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -600,6 +602,7 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
directedReadOptions = builder.directedReadOptions;
}

/**
Expand Down Expand Up @@ -700,6 +703,7 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private DirectedReadOptions directedReadOptions;

private Builder() {
// Manually set retry and polling settings that work.
Expand Down Expand Up @@ -1081,6 +1085,12 @@ public Builder setCompressorName(@Nullable String compressorName) {
return this;
}

public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
this.directedReadOptions = directedReadOptions;
return this;
}

/**
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
* such as fetching results for an {@link AsyncResultSet}.
Expand Down Expand Up @@ -1174,6 +1184,9 @@ public SpannerOptions build() {
this.numChannels =
this.grpcGcpExtensionEnabled ? GRPC_GCP_ENABLED_DEFAULT_CHANNELS : DEFAULT_CHANNELS;
}
if (directedReadOptions != null) {
SpannerUtil.verifyDirectedReadOptions(directedReadOptions);
}

return new SpannerOptions(this);
}
Expand Down Expand Up @@ -1291,6 +1304,10 @@ public String getCompressorName() {
return compressorName;
}

public DirectedReadOptions getDirectedReadOptions() {
return directedReadOptions;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
rpc.executeQuery(builder.build(), session.getOptions(), false);
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -747,7 +747,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), false);
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -304,6 +305,7 @@ private void awaitTermination() throws InterruptedException {
private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D;
private static final ConcurrentMap<String, RateLimiter> ADMINISTRATIVE_REQUESTS_RATE_LIMITERS =
new ConcurrentHashMap<>();
private final DirectedReadOptions directedReadOptions;

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
Expand Down Expand Up @@ -354,6 +356,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();
this.directedReadOptions = options.getDirectedReadOptions();

if (initializeStubs) {
// Create a managed executor provider.
Expand Down Expand Up @@ -1636,7 +1639,11 @@ public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Opt

@Override
public StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
ReadRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean readOnly) {
request = validateReadRequest(request, readOnly);
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod());
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
Expand All @@ -1658,13 +1665,15 @@ public void cancel(String message) {
}

@Override
public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
return get(executeQueryAsync(request, options));
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly) {
return get(executeQueryAsync(request, options, readOnly));
}

@Override
public ApiFuture<ResultSet> executeQueryAsync(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly) {
request = validateExecuteSqlRequest(request, readOnly);
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
return spannerStub.executeSqlCallable().futureCall(request, context);
Expand All @@ -1673,6 +1682,7 @@ public ApiFuture<ResultSet> executeQueryAsync(
@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
request = validateExecuteSqlRequest(request, false);
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
Expand All @@ -1686,6 +1696,7 @@ public RetrySettings getPartitionedDmlRetrySettings() {
@Override
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
request = validateExecuteSqlRequest(request, false);
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
Expand All @@ -1696,7 +1707,11 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
ExecuteSqlRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean readOnly) {
request = validateExecuteSqlRequest(request, readOnly);
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
Expand Down Expand Up @@ -2014,4 +2029,32 @@ private static Duration systemProperty(String name, int defaultValue) {
String stringValue = System.getProperty(name, "");
return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue));
}

private ExecuteSqlRequest validateExecuteSqlRequest(ExecuteSqlRequest request, boolean readOnly) {
if (!readOnly) {
if (request.hasDirectedReadOptions() || (directedReadOptions != null)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"DirectedReadOptions can't be set for Read-Write or Partitioned DML transactions");
}
}
if (directedReadOptions != null) {
return request.toBuilder().setDirectedReadOptions(directedReadOptions).build();
}
return request;
}

private ReadRequest validateReadRequest(ReadRequest request, boolean readOnly) {
if (!readOnly) {
if (request.hasDirectedReadOptions() || (directedReadOptions != null)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"DirectedReadOptions can't be set for Read-Write or Partitioned DML transactions");
}
}
if (directedReadOptions != null) {
return request.toBuilder().setDirectedReadOptions(directedReadOptions).build();
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,16 @@ ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?>
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
ReadRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean readOnly);

ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly);

ApiFuture<ResultSet> executeQueryAsync(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options);
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean readOnly);

ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

Expand All @@ -335,7 +339,10 @@ ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
ExecuteSqlRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean readOnly);

ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map<Option, ?> options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public void testCommitReturnsCommitStats() {
new AsyncTransactionManagerImpl(session, mock(Span.class), Options.commitStats())) {
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
.thenReturn(transaction);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
CommitResponse response = mock(CommitResponse.class);
when(response.getCommitTimestamp()).thenReturn(commitTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ public void request(int numMessages) {}
private void mockRead(final PartialResultSet myResultSet) {
final ArgumentCaptor<SpannerRpc.ResultStreamConsumer> consumer =
ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class);
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options)))
Mockito.when(
rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), Mockito.anyBoolean()))
.then(
invocation -> {
consumer.getValue().onPartialResultSet(myResultSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,12 @@ public void testSessionNotFoundReadWriteTransaction() {
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
any(ExecuteSqlRequest.class),
any(ResultStreamConsumer.class),
any(Map.class),
any(Boolean.class)))
.thenReturn(closedStreamingCall);
when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class)))
when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class), any(Boolean.class)))
.thenThrow(sessionNotFound);
when(rpc.executeBatchDml(any(ExecuteBatchDmlRequest.class), any(Map.class)))
.thenThrow(sessionNotFound);
Expand Down
Loading

0 comments on commit f284448

Please sign in to comment.