Skip to content

Commit

Permalink
Merge branch 'googleapis:main' into interval_support
Browse files Browse the repository at this point in the history
  • Loading branch information
sagarwaal authored Oct 22, 2024
2 parents df6ebe7 + 16cc6ee commit d366c1c
Show file tree
Hide file tree
Showing 27 changed files with 843 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

private ResultSet readInternal(
String table,
@Nullable String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);

/**
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
* will be included if the read-write transaction is executed on a multiplexed session.
*/
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}

static final class LazyByteArray implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
Expand Down Expand Up @@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
Expand Down Expand Up @@ -83,6 +84,7 @@ Map<String, String> createClientAttributes(String projectId, String client_name)
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
// TODO: Replace this with real value.
clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void addAttributes(Map<String, String> attributes) {
for (ApiTracer child : children) {
if (child instanceof MetricsTracer) {
MetricsTracer metricsTracer = (MetricsTracer) child;
attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value));
metricsTracer.addAttributes(attributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer,
false);
/* useMultiplexedSessionForRW = */ false);
}

DatabaseClientImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
Expand All @@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// collect the precommit token from each PartialResultSet
if (current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
}
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
static TransactionOptions createReadWriteTransactionOptions(
Options options, ByteString previousTransactionId) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
Expand All @@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
if (previousTransactionId != null
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -427,13 +432,17 @@ public void close() {
}

ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
Expand Down Expand Up @@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
return res;
}

TransactionContextImpl newTransaction(Options options) {
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
.setTransactionId(null)
.setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1726,14 +1726,13 @@ private ApiTracerFactory getDefaultApiTracerFactory() {
private ApiTracerFactory createMetricsApiTracerFactory() {
OpenTelemetry openTelemetry =
this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
getDefaultProjectId(), getCredentials());
this.getProjectId(), getCredentials());

return openTelemetry != null
? new MetricsTracerFactory(
new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
builtInOpenTelemetryMetricsProvider.createClientAttributes(
getDefaultProjectId(),
"spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;

/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options);
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
Expand Down Expand Up @@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}
txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (!useInlinedBegin) {
txn.ensureTxn();
}
Expand Down
Loading

0 comments on commit d366c1c

Please sign in to comment.