Skip to content

Commit

Permalink
chore(spanner): add R/W multiplexed session support (#3381)
Browse files Browse the repository at this point in the history
This PR introduces support for using multiplexed sessions in R/W transactions for the following methods:
1. write
2. writeWithOptions
3. readWriteTransaction
4. transactionManager
5. runAsync
6. transactionManagerAsync
  • Loading branch information
harshachinta authored Oct 14, 2024
1 parent 4f73bdb commit 1f53d38
Show file tree
Hide file tree
Showing 17 changed files with 768 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ public String getDatabaseRole() {
throw new UnsupportedOperationException();
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
Expand All @@ -63,26 +52,6 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
throw new UnsupportedOperationException();
}

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public TransactionManager transactionManager(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public AsyncRunner runAsync(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public ApiFuture<Void> closeAsync() {
if (txn != null) {
txn.close();
}
if (session != null) {
session.onTransactionDone();
}
return MoreObjects.firstNonNull(res, ApiFutures.immediateFuture(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionForRW;

final boolean useMultiplexedSessionBlindWrite;

Expand All @@ -46,7 +47,8 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
tracer,
/* useMultiplexedSessionForRW = */ false);
}

@VisibleForTesting
Expand All @@ -56,20 +58,23 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
tracer,
false);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
TraceWrapper tracer) {
TraceWrapper tracer,
boolean useMultiplexedSessionForRW) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
}

@VisibleForTesting
Expand All @@ -85,6 +90,14 @@ DatabaseClient getMultiplexedSession() {
return pool.getMultiplexedSessionWithFallback();
}

@VisibleForTesting
DatabaseClient getMultiplexedSessionForRW() {
if (this.useMultiplexedSessionForRW) {
return getMultiplexedSession();
}
return getSession();
}

private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() {
return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null;
}
Expand Down Expand Up @@ -116,6 +129,9 @@ public CommitResponse writeWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
} catch (RuntimeException e) {
span.setStatus(e);
Expand Down Expand Up @@ -241,7 +257,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction(options);
return getMultiplexedSessionForRW().readWriteTransaction(options);
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand All @@ -253,7 +269,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
public TransactionManager transactionManager(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return getSession().transactionManager(options);
return getMultiplexedSessionForRW().transactionManager(options);
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand All @@ -265,7 +281,7 @@ public TransactionManager transactionManager(TransactionOption... options) {
public AsyncRunner runAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return getSession().runAsync(options);
return getMultiplexedSessionForRW().runAsync(options);
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand All @@ -277,7 +293,7 @@ public AsyncRunner runAsync(TransactionOption... options) {
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return getSession().transactionManagerAsync(options);
return getMultiplexedSessionForRW().transactionManagerAsync(options);
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

/**
* Represents a {@link AsyncRunner} using a multiplexed session that is not yet ready. The execution
* will be delayed until the multiplexed session has been created and is ready. This class is only
* used during the startup of the client and the multiplexed session has not yet been created.
*/
public class DelayedAsyncRunner implements AsyncRunner {

private final ApiFuture<AsyncRunner> asyncRunnerFuture;

public DelayedAsyncRunner(ApiFuture<AsyncRunner> asyncRunnerFuture) {
this.asyncRunnerFuture = asyncRunnerFuture;
}

ApiFuture<AsyncRunner> getAsyncRunner() {
return ApiFutures.catchingAsync(
asyncRunnerFuture,
Exception.class,
exception -> {
if (exception instanceof InterruptedException) {
throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) exception);
}
if (exception instanceof ExecutionException) {
throw SpannerExceptionFactory.causeAsRunTimeException((ExecutionException) exception);
}
throw exception;
},
MoreExecutors.directExecutor());
}

@Override
public <R> ApiFuture<R> runAsync(AsyncWork<R> work, Executor executor) {
return ApiFutures.transformAsync(
getAsyncRunner(),
asyncRunner -> asyncRunner.runAsync(work, executor),
MoreExecutors.directExecutor());
}

@Override
public ApiFuture<Timestamp> getCommitTimestamp() {
return ApiFutures.transformAsync(
getAsyncRunner(), AsyncRunner::getCommitTimestamp, MoreExecutors.directExecutor());
}

@Override
public ApiFuture<CommitResponse> getCommitResponse() {
return ApiFutures.transformAsync(
getAsyncRunner(), AsyncRunner::getCommitResponse, MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import java.util.concurrent.ExecutionException;

/**
* Represents a {@link AsyncTransactionManager} using a multiplexed session that is not yet ready.
* The execution will be delayed until the multiplexed session has been created and is ready. This
* class is only used during the startup of the client and the multiplexed session has not yet been
* created.
*/
public class DelayedAsyncTransactionManager implements AsyncTransactionManager {

private final ApiFuture<AsyncTransactionManager> asyncTransactionManagerApiFuture;

DelayedAsyncTransactionManager(
ApiFuture<AsyncTransactionManager> asyncTransactionManagerApiFuture) {
this.asyncTransactionManagerApiFuture = asyncTransactionManagerApiFuture;
}

AsyncTransactionManager getAsyncTransactionManager() {
try {
return this.asyncTransactionManagerApiFuture.get();
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.causeAsRunTimeException(executionException);
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}

@Override
public TransactionContextFuture beginAsync() {
return getAsyncTransactionManager().beginAsync();
}

@Override
public ApiFuture<Void> rollbackAsync() {
return getAsyncTransactionManager().rollbackAsync();
}

@Override
public TransactionContextFuture resetForRetryAsync() {
return getAsyncTransactionManager().resetForRetryAsync();
}

@Override
public TransactionState getState() {
return getAsyncTransactionManager().getState();
}

@Override
public ApiFuture<CommitResponse> getCommitResponse() {
return getAsyncTransactionManager().getCommitResponse();
}

@Override
public void close() {
getAsyncTransactionManager().close();
}

@Override
public ApiFuture<Void> closeAsync() {
return getAsyncTransactionManager().closeAsync();
}
}
Loading

0 comments on commit 1f53d38

Please sign in to comment.