diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 2ebe77e1ba2..92b1baacc03 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -69,6 +69,10 @@ public class SessionPoolOptions { /** Property for allowing mocking of session maintenance clock. */ private final Clock poolMaintainerClock; + private final Duration waitForMultiplexedSession; + private final boolean useMultiplexedSession; + private final Duration multiplexedSessionMaintenanceDuration; + private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. // We allow that to prevent code that only sets a value for maxSessions to break if the @@ -93,6 +97,9 @@ private SessionPoolOptions(Builder builder) { this.randomizePositionQPSThreshold = builder.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions; this.poolMaintainerClock = builder.poolMaintainerClock; + this.useMultiplexedSession = builder.useMultiplexedSession; + this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; + this.waitForMultiplexedSession = builder.waitForMultiplexedSession; } @Override @@ -123,7 +130,11 @@ public boolean equals(Object o) { && Objects.equals(this.randomizePositionQPSThreshold, other.randomizePositionQPSThreshold) && Objects.equals( this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions) - && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock); + && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock) + && Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession) + && Objects.equals( + this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration) + && Objects.equals(this.waitForMultiplexedSession, other.waitForMultiplexedSession); } @Override @@ -148,7 +159,10 @@ public int hashCode() { this.releaseToPosition, this.randomizePositionQPSThreshold, this.inactiveTransactionRemovalOptions, - this.poolMaintainerClock); + this.poolMaintainerClock, + this.useMultiplexedSession, + this.multiplexedSessionMaintenanceDuration, + this.waitForMultiplexedSession); } public Builder toBuilder() { @@ -271,6 +285,18 @@ long getRandomizePositionQPSThreshold() { return randomizePositionQPSThreshold; } + boolean getUseMultiplexedSession() { + return useMultiplexedSession; + } + + Duration getMultiplexedSessionMaintenanceDuration() { + return multiplexedSessionMaintenanceDuration; + } + + Duration getWaitForMultiplexedSession() { + return waitForMultiplexedSession; + } + public static Builder newBuilder() { return new Builder(); } @@ -467,6 +493,9 @@ public static class Builder { */ private long randomizePositionQPSThreshold = 0L; + private boolean useMultiplexedSession = false; + private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); + private Duration waitForMultiplexedSession = Duration.ofSeconds(10); private Clock poolMaintainerClock; private static Position getReleaseToPositionFromSystemProperty() { @@ -669,6 +698,47 @@ Builder setPoolMaintainerClock(Clock poolMaintainerClock) { return this; } + /** + * Sets whether the client should use multiplexed session or not. If set to true, the client + * optimises and runs multiple applicable requests concurrently on a single session. A single + * multiplexed session is sufficient to handle all concurrent traffic. + * + *

When set to false, the client uses the regular session cached in the session pool for + * running 1 concurrent transaction per session. We require to provision sufficient sessions by + * making use of {@link SessionPoolOptions#minSessions} and {@link + * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in + * higher latencies. + */ + Builder setUseMultiplexedSession(boolean useMultiplexedSession) { + this.useMultiplexedSession = useMultiplexedSession; + return this; + } + + @VisibleForTesting + Builder setMultiplexedSessionMaintenanceDuration( + Duration multiplexedSessionMaintenanceDuration) { + this.multiplexedSessionMaintenanceDuration = multiplexedSessionMaintenanceDuration; + return this; + } + + /** + * This option is only used when {@link SessionPoolOptions#useMultiplexedSession} is set to + * true. If greater than zero, calls to {@link Spanner#getDatabaseClient(DatabaseId)} will block + * for up to the given duration while waiting for the multiplexed session to be created. The + * default value for this is 10 seconds. + * + *

If this is set to null or zero, the client does not wait for the session to be created, + * which means that the first read requests could see more latency, as they will need to wait + * until the multiplexed session has been created. + * + *

Note that we would need to use the option {@link SessionPoolOptions#waitForMinSessions} if + * we want a similar blocking behavior for the other sessions within the session pool. + */ + Builder setWaitForMultiplexedSession(Duration waitForMultiplexedSession) { + this.waitForMultiplexedSession = waitForMultiplexedSession; + return this; + } + /** * Sets whether the client should automatically execute a background query to detect the dialect * that is used by the database or not. Set this option to true if you do not know what the diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index 22d10d92a87..9e029e931d6 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -246,4 +246,62 @@ public void testRandomizePositionQPSThreshold() { IllegalArgumentException.class, () -> SessionPoolOptions.newBuilder().setRandomizePositionQPSThreshold(-1L)); } + + @Test + public void testUseMultiplexedSession() { + assertEquals(false, SessionPoolOptions.newBuilder().build().getUseMultiplexedSession()); + assertEquals( + true, + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .build() + .getUseMultiplexedSession()); + assertEquals( + false, + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSession(false) + .build() + .getUseMultiplexedSession()); + } + + @Test + public void testMultiplexedSessionMaintenanceDuration() { + assertEquals( + Duration.ofDays(7), + SessionPoolOptions.newBuilder().build().getMultiplexedSessionMaintenanceDuration()); + assertEquals( + Duration.ofDays(2), + SessionPoolOptions.newBuilder() + .setMultiplexedSessionMaintenanceDuration(Duration.ofDays(2)) + .build() + .getMultiplexedSessionMaintenanceDuration()); + assertEquals( + Duration.ofDays(10), + SessionPoolOptions.newBuilder() + .setMultiplexedSessionMaintenanceDuration(Duration.ofDays(2)) + .setMultiplexedSessionMaintenanceDuration(Duration.ofDays(10)) + .build() + .getMultiplexedSessionMaintenanceDuration()); + } + + @Test + public void testWaitForMultiplexedSession() { + assertEquals( + Duration.ofSeconds(10), + SessionPoolOptions.newBuilder().build().getWaitForMultiplexedSession()); + assertEquals( + Duration.ofSeconds(20), + SessionPoolOptions.newBuilder() + .setWaitForMultiplexedSession(Duration.ofSeconds(20)) + .build() + .getWaitForMultiplexedSession()); + assertEquals( + Duration.ofSeconds(10), + SessionPoolOptions.newBuilder() + .setWaitForMultiplexedSession(Duration.ofSeconds(2)) + .setWaitForMultiplexedSession(Duration.ofSeconds(10)) + .build() + .getWaitForMultiplexedSession()); + } }