diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index c6251e64db5..f0c772acd9e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Stack; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -292,30 +293,38 @@ public void close() { } public ApiFuture closeAsync() { - if (!isClosed()) { - List> futures = new ArrayList<>(); - if (isBatchActive()) { - abortBatch(); - } - if (isTransactionStarted()) { + synchronized (this) { + if (!isClosed()) { + List> futures = new ArrayList<>(); + if (isBatchActive()) { + abortBatch(); + } + if (isTransactionStarted()) { + try { + futures.add(rollbackAsync()); + } catch (Exception exception) { + // ignore and continue to close the connection. + } + } + // Try to wait for the current statement to finish (if any) before we actually close the + // connection. + this.closed = true; + // Add a no-op statement to the executor. Once this has been executed, we know that all + // preceding statements have also been executed, as the executor is single-threaded and + // executes all statements in order of submitting. The Executor#submit method can throw a + // RejectedExecutionException if the executor is no longer in state where it accepts new + // tasks. try { - futures.add(rollbackAsync()); - } catch (Exception exception) { + futures.add(statementExecutor.submit(() -> null)); + } catch (RejectedExecutionException ignored) { // ignore and continue to close the connection. } + statementExecutor.shutdown(); + leakedException = null; + spannerPool.removeConnection(options, this); + return ApiFutures.transform( + ApiFutures.allAsList(futures), ignored -> null, MoreExecutors.directExecutor()); } - // Try to wait for the current statement to finish (if any) before we actually close the - // connection. - this.closed = true; - // Add a no-op statement to the executor. Once this has been executed, we know that all - // preceding statements have also been executed, as the executor is single-threaded and - // executes all statements in order of submitting. - futures.add(statementExecutor.submit(() -> null)); - statementExecutor.shutdown(); - leakedException = null; - spannerPool.removeConnection(options, this); - return ApiFutures.transform( - ApiFutures.allAsList(futures), ignored -> null, MoreExecutors.directExecutor()); } return ApiFutures.immediateFuture(null); }