From 6bd7dab0b30f2556f7e13a92c64137fc75b2b040 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 11 May 2021 17:08:29 -0700 Subject: [PATCH 01/12] Fix race on reopening CBSChannel --- .../eventhubs/impl/RequestResponseOpener.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 17625b3a2295d..496f288eec824 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -8,8 +8,11 @@ import org.slf4j.LoggerFactory; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import com.microsoft.azure.eventhubs.impl.IOObject.IOObjectState; + public class RequestResponseOpener implements Operation { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseOpener.class); @@ -21,7 +24,9 @@ public class RequestResponseOpener implements Operation private final AmqpConnection eventDispatcher; private final ScheduledExecutorService executor; + private RequestResponseChannel previousChannel; private boolean isOpened; + private CompletableFuture closeWaiter; public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, final String endpointAddress, final AmqpConnection eventDispatcher, final ScheduledExecutorService executor) { @@ -37,6 +42,34 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String @Override public synchronized void run(OperationResult operationCallback) { if (this.isOpened) { + // this.previousChannel cannot be null if this.isOpened is true + if ((this.previousChannel.getState() == IOObjectState.OPENED) || (this.previousChannel.getState() == IOObjectState.OPENING)) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("skipping run because inner channel currently open"); + } + return; + } + + // The inner channel is closing but hasn't called the callback below which does some cleanup before setting this.isOpened + // back to false. We want to wait for that cleanup to happen, then relaunch the run operation to open a new inner channel. + if (this.closeWaiter != null) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("close pending, open already queued"); + } + return; + } + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("close pending, will do open when it finishes"); + } + CompletableFuture tempWaiter = new CompletableFuture(); + tempWaiter.thenRunAsync(() -> { + if (RequestResponseOpener.TRACE_LOGGER.isInfoEnabled()) { + RequestResponseOpener.TRACE_LOGGER.info("pending close finished, starting queued open"); + } + RequestResponseOpener.this.run(operationCallback); + }, this.executor); + this.closeWaiter = tempWaiter; // set closeWaiter only after the continuation is set up return; } @@ -59,7 +92,7 @@ public synchronized void run(OperationResult this.endpointAddress, session, this.executor); - + this.previousChannel = requestResponseChannel; requestResponseChannel.open( new OperationResult() { @Override @@ -94,6 +127,9 @@ public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); isOpened = false; + if (closeWaiter != null) { + closeWaiter.complete(null); + } if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", @@ -107,6 +143,9 @@ public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); isOpened = false; + if (closeWaiter != null) { + closeWaiter.complete(null); + } if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", From f393c42b65889125ecd09b92bd1573802293d455 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 11 May 2021 17:43:41 -0700 Subject: [PATCH 02/12] Avoid more races --- .../eventhubs/impl/RequestResponseOpener.java | 49 +++++++++++++------ 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 496f288eec824..ace8cf6756fcb 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -25,7 +25,8 @@ public class RequestResponseOpener implements Operation private final ScheduledExecutorService executor; private RequestResponseChannel previousChannel; - private boolean isOpened; + private final Object isOpenedSynchronizer = new Object(); + private volatile boolean isOpened; private CompletableFuture closeWaiter; public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, @@ -41,8 +42,13 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String @Override public synchronized void run(OperationResult operationCallback) { - if (this.isOpened) { - // this.previousChannel cannot be null if this.isOpened is true + boolean capturedIsOpened; + synchronized (this.isOpenedSynchronizer) { + // Capture so that we don't have to synchronize a bunch of code + capturedIsOpened = this.isOpened; + } + if (capturedIsOpened) { + // this.previousChannel cannot be null if this.isOpened was ever true if ((this.previousChannel.getState() == IOObjectState.OPENED) || (this.previousChannel.getState() == IOObjectState.OPENING)) { if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info("skipping run because inner channel currently open"); @@ -59,9 +65,6 @@ public synchronized void run(OperationResult return; } - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("close pending, will do open when it finishes"); - } CompletableFuture tempWaiter = new CompletableFuture(); tempWaiter.thenRunAsync(() -> { if (RequestResponseOpener.TRACE_LOGGER.isInfoEnabled()) { @@ -69,8 +72,18 @@ public synchronized void run(OperationResult } RequestResponseOpener.this.run(operationCallback); }, this.executor); - this.closeWaiter = tempWaiter; // set closeWaiter only after the continuation is set up - return; + + synchronized (this.isOpenedSynchronizer) { + if (this.isOpened) { + // The close callback was not called while we were setting up + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("close pending, will do open when it finishes"); + } + this.closeWaiter = tempWaiter; // set closeWaiter only after the continuation is set up + return; + } + // else the close callback has been called and the open can just proceed normally! + } } final Session session = this.sessionProvider.getSession( @@ -102,7 +115,9 @@ public void onComplete(Void result) { operationCallback.onComplete(requestResponseChannel); - isOpened = true; + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + isOpened = true; + } if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]", @@ -126,9 +141,11 @@ public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - isOpened = false; - if (closeWaiter != null) { - closeWaiter.complete(null); + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + isOpened = false; + if (closeWaiter != null) { + closeWaiter.complete(null); + } } if (TRACE_LOGGER.isInfoEnabled()) { @@ -142,9 +159,11 @@ public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - isOpened = false; - if (closeWaiter != null) { - closeWaiter.complete(null); + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + isOpened = false; + if (closeWaiter != null) { + closeWaiter.complete(null); + } } if (TRACE_LOGGER.isWarnEnabled()) { From 145bc9781251988f0746255af35a461699246ed2 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 1 Jun 2021 10:55:41 -0700 Subject: [PATCH 03/12] Fix maximumSilentTime validation issue hitting Spark --- .../com/microsoft/azure/eventhubs/EventHubClientOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index f47e7381bdcd5..7790454640a48 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -107,7 +107,7 @@ public ProxyConfiguration getProxyConfiguration() { * @return The updated options object. */ public EventHubClientOptions setMaximumSilentTime(Duration maximumSilentTime) { - if (maximumSilentTime.compareTo(EventHubClientOptions.SILENT_MINIMUM) < 0) { + if (!maximumSilentTime.equals(EventHubClientOptions.SILENT_OFF) && (maximumSilentTime.compareTo(EventHubClientOptions.SILENT_MINIMUM) < 0)) { throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM.toMillis() + " milliseconds"); } this.maximumSilentTime = maximumSilentTime; From f23c5fd107f0bd72f4e1851a583e142eaddf0426 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 1 Jun 2021 10:58:01 -0700 Subject: [PATCH 04/12] Roll track1 vers to 3.3.0-beta.1 --- sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml | 4 ++-- sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml | 2 +- sdk/eventhubs/microsoft-azure-eventhubs/pom.xml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml index 4ce512bfdf2ce..72183c66ddbf8 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs-eph - 3.2.3 + 3.3.0-beta.1 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) Please note, a newer package azure-messaging-eventhubs for Azure Event Hubs is available at https://search.maven.org/artifact/com.azure/azure-messaging-eventhubs as of February 2020. While this package will continue to receive critical bug fixes, we strongly encourage you to upgrade. Read the migration guide at https://aka.ms/azsdk/java/migrate/eh for more details. @@ -36,7 +36,7 @@ com.microsoft.azure azure-eventhubs - 3.2.3 + 3.3.0-beta.1 com.microsoft.azure diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml index fb02078eeb782..d174fee0023a8 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml @@ -40,7 +40,7 @@ com.microsoft.azure azure-eventhubs - 3.2.3 + 3.3.0-betal.1 org.apache.logging.log4j diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index c4c05a29629b7..dccd319ddf125 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.2.3 + 3.3.0-beta.1 Microsoft Azure SDK for Event Hubs Please note, a newer package azure-messaging-eventhubs for Azure Event Hubs is available at https://search.maven.org/artifact/com.azure/azure-messaging-eventhubs as of February 2020. While this package will continue to receive critical bug fixes, we strongly encourage you to upgrade. Read the migration guide at https://aka.ms/azsdk/java/migrate/eh for more details. From 72f218a20dcb60dcb017193fb3b14a03402ca521 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 1 Jun 2021 11:49:54 -0700 Subject: [PATCH 05/12] Pom typo --- sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml index d174fee0023a8..e2c5af213773c 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml @@ -40,7 +40,7 @@ com.microsoft.azure azure-eventhubs - 3.3.0-betal.1 + 3.3.0-beta.1 org.apache.logging.log4j From 743ef658482aa6df90dff961f7c2eb8a33903e33 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 2 Jun 2021 11:03:37 -0700 Subject: [PATCH 06/12] Revert version changes to avoid polluting fix PR --- sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml | 4 ++-- sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml | 2 +- sdk/eventhubs/microsoft-azure-eventhubs/pom.xml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml index 72183c66ddbf8..4ce512bfdf2ce 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs-eph - 3.3.0-beta.1 + 3.2.3 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) Please note, a newer package azure-messaging-eventhubs for Azure Event Hubs is available at https://search.maven.org/artifact/com.azure/azure-messaging-eventhubs as of February 2020. While this package will continue to receive critical bug fixes, we strongly encourage you to upgrade. Read the migration guide at https://aka.ms/azsdk/java/migrate/eh for more details. @@ -36,7 +36,7 @@ com.microsoft.azure azure-eventhubs - 3.3.0-beta.1 + 3.2.3 com.microsoft.azure diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml index e2c5af213773c..fb02078eeb782 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml @@ -40,7 +40,7 @@ com.microsoft.azure azure-eventhubs - 3.3.0-beta.1 + 3.2.3 org.apache.logging.log4j diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index dccd319ddf125..c4c05a29629b7 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.3.0-beta.1 + 3.2.3 Microsoft Azure SDK for Event Hubs Please note, a newer package azure-messaging-eventhubs for Azure Event Hubs is available at https://search.maven.org/artifact/com.azure/azure-messaging-eventhubs as of February 2020. While this package will continue to receive critical bug fixes, we strongly encourage you to upgrade. Read the migration guide at https://aka.ms/azsdk/java/migrate/eh for more details. From 08c91fd331a4a2f12b033e54857f807342fdee55 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 8 Jun 2021 16:04:34 -0700 Subject: [PATCH 07/12] Improve and simplify race condition fix --- .../eventhubs/impl/RequestResponseOpener.java | 79 ++++++------------- 1 file changed, 26 insertions(+), 53 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index ace8cf6756fcb..9a9353c52cf1c 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory; import java.util.Locale; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import com.microsoft.azure.eventhubs.impl.IOObject.IOObjectState; @@ -24,10 +23,9 @@ public class RequestResponseOpener implements Operation private final AmqpConnection eventDispatcher; private final ScheduledExecutorService executor; - private RequestResponseChannel previousChannel; + private RequestResponseChannel currentChannel = null; private final Object isOpenedSynchronizer = new Object(); - private volatile boolean isOpened; - private CompletableFuture closeWaiter; + private volatile boolean isOpening = false; public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, final String endpointAddress, final AmqpConnection eventDispatcher, final ScheduledExecutorService executor) { @@ -42,48 +40,27 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String @Override public synchronized void run(OperationResult operationCallback) { - boolean capturedIsOpened; synchronized (this.isOpenedSynchronizer) { - // Capture so that we don't have to synchronize a bunch of code - capturedIsOpened = this.isOpened; - } - if (capturedIsOpened) { - // this.previousChannel cannot be null if this.isOpened was ever true - if ((this.previousChannel.getState() == IOObjectState.OPENED) || (this.previousChannel.getState() == IOObjectState.OPENING)) { - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("skipping run because inner channel currently open"); + if (this.currentChannel != null) { + if ((this.currentChannel.getState() == IOObjectState.OPENED) || (this.currentChannel.getState() == IOObjectState.OPENING)) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("inner channel currently open, no need to recreate"); + } + return; } - return; } - // The inner channel is closing but hasn't called the callback below which does some cleanup before setting this.isOpened - // back to false. We want to wait for that cleanup to happen, then relaunch the run operation to open a new inner channel. - if (this.closeWaiter != null) { + // Inner channel doesn't exist or it is closing/closed. Do we need to start creation of a new one, + // or is that already in progress? + if (this.isOpening) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("close pending, open already queued"); + TRACE_LOGGER.info("inner channel creation already in progress"); } return; } - CompletableFuture tempWaiter = new CompletableFuture(); - tempWaiter.thenRunAsync(() -> { - if (RequestResponseOpener.TRACE_LOGGER.isInfoEnabled()) { - RequestResponseOpener.TRACE_LOGGER.info("pending close finished, starting queued open"); - } - RequestResponseOpener.this.run(operationCallback); - }, this.executor); - - synchronized (this.isOpenedSynchronizer) { - if (this.isOpened) { - // The close callback was not called while we were setting up - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("close pending, will do open when it finishes"); - } - this.closeWaiter = tempWaiter; // set closeWaiter only after the continuation is set up - return; - } - // else the close callback has been called and the open can just proceed normally! - } + // Need to start creating an inner channel. + this.isOpening = true; } final Session session = this.sessionProvider.getSession( @@ -98,6 +75,9 @@ public synchronized void run(OperationResult }); if (session == null) { + if (TRACE_LOGGER.isErrorEnabled()) { + TRACE_LOGGER.error("got a null session, inner channel recreation cannot continue"); + } return; } final RequestResponseChannel requestResponseChannel = new RequestResponseChannel( @@ -105,7 +85,7 @@ public synchronized void run(OperationResult this.endpointAddress, session, this.executor); - this.previousChannel = requestResponseChannel; + this.currentChannel = requestResponseChannel; requestResponseChannel.open( new OperationResult() { @Override @@ -116,7 +96,8 @@ public void onComplete(Void result) { operationCallback.onComplete(requestResponseChannel); synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { - isOpened = true; + // Inner channel creation complete. + isOpening = false; } if (TRACE_LOGGER.isInfoEnabled()) { @@ -129,6 +110,12 @@ public void onComplete(Void result) { public void onError(Exception error) { operationCallback.onError(error); + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + // Inner channel creation failed. + // The next time run() is called should try again. + isOpening = false; + } + if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", clientId, sessionName, linkName, endpointAddress, error)); @@ -141,13 +128,6 @@ public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { - isOpened = false; - if (closeWaiter != null) { - closeWaiter.complete(null); - } - } - if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", clientId, sessionName, linkName, endpointAddress)); @@ -159,13 +139,6 @@ public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { - isOpened = false; - if (closeWaiter != null) { - closeWaiter.complete(null); - } - } - if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", clientId, sessionName, linkName, endpointAddress, error)); From ede10253909de91431d60a8b15c5da3241ab43ff Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Mon, 14 Jun 2021 10:52:56 -0700 Subject: [PATCH 08/12] More logging --- .../microsoft/azure/eventhubs/impl/RequestResponseOpener.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 9a9353c52cf1c..2fc28895bafc3 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -61,6 +61,9 @@ public synchronized void run(OperationResult // Need to start creating an inner channel. this.isOpening = true; + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("opening inner channel client draft2"); + } } final Session session = this.sessionProvider.getSession( From f392207aac0fe913eb19193069a8eab9bea88278 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 16 Jun 2021 16:46:43 -0700 Subject: [PATCH 09/12] Improve tracing to track down race issue --- .../eventhubs/impl/FaultTolerantObject.java | 7 ++-- .../azure/eventhubs/impl/IOObject.java | 2 ++ .../impl/RequestResponseChannel.java | 14 ++++++++ .../eventhubs/impl/RequestResponseOpener.java | 33 ++++++++++++------- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java index 78c14b75ae615..286d48a4d10e1 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java @@ -71,7 +71,7 @@ public void onEvent() { "FaultTolerantObject[%s] client[%s] session[%s] decided to create cNIO[%s] innerObject[%s] iOstate[%s]", FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName, FaultTolerantObject.this.creatingNewInnerObject ? "T" : "F", - FaultTolerantObject.this.innerObject != null ? "not null" : "null", + FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getId() : "null", FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getState().toString() : "--")); } shouldCreateNewInnerObject = true; @@ -95,8 +95,9 @@ public void onComplete(T result) { } if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object creation complete", - FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName)); + TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object rrc[%s] creation complete", + FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName, + result.getId())); } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java index 9126fab989533..6d7f884138f2e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java @@ -8,6 +8,8 @@ public interface IOObject { // should be run on reactor thread IOObjectState getState(); + String getId(); + enum IOObjectState { OPENING, OPENED, diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index 6621f82d662c7..98171818d694e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -20,6 +20,7 @@ import org.apache.qpid.proton.message.Message; import java.util.HashMap; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +38,8 @@ public class RequestResponseChannel implements IOObject { private final AtomicInteger openRefCount; private final AtomicInteger closeRefCount; + private final String instanceName = StringUtil.getRandomString("RRC"); + private OperationResult onOpen; private OperationResult onClose; // handles closeLink due to failures private OperationResult onGraceFullClose; // handles intentional close @@ -194,6 +197,17 @@ public IOObjectState getState() { return IOObjectState.CLOSING; // only left cases are if some are active and some are closed } + public String getStateDebug() { + return String.format(Locale.US, "sendLink local[%s] remote[%s] receiveLink local[%s] remote[%s]", + sendLink.getLocalState().toString(), sendLink.getRemoteState().toString(), + receiveLink.getLocalState().toString(), receiveLink.getRemoteState().toString()); + } + + @Override + public String getId() { + return this.instanceName; + } + private class RequestHandler implements AmqpSender { @Override diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 2fc28895bafc3..cc0a454016482 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -23,6 +23,8 @@ public class RequestResponseOpener implements Operation private final AmqpConnection eventDispatcher; private final ScheduledExecutorService executor; + private final String instanceName = StringUtil.getRandomString("RRO"); + private RequestResponseChannel currentChannel = null; private final Object isOpenedSynchronizer = new Object(); private volatile boolean isOpening = false; @@ -44,7 +46,8 @@ public synchronized void run(OperationResult if (this.currentChannel != null) { if ((this.currentChannel.getState() == IOObjectState.OPENED) || (this.currentChannel.getState() == IOObjectState.OPENING)) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("inner channel currently open, no need to recreate"); + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate debug [%s]", + this.clientId, this.instanceName, this.currentChannel.getId(), this.currentChannel.getState().toString(), this.currentChannel.getStateDebug())); } return; } @@ -54,7 +57,8 @@ public synchronized void run(OperationResult // or is that already in progress? if (this.isOpening) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("inner channel creation already in progress"); + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel creation already in progress", + this.clientId, this.instanceName)); } return; } @@ -62,7 +66,8 @@ public synchronized void run(OperationResult // Need to start creating an inner channel. this.isOpening = true; if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("opening inner channel client draft2"); + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel", + this.clientId, this.instanceName)); } } @@ -79,7 +84,13 @@ public synchronized void run(OperationResult if (session == null) { if (TRACE_LOGGER.isErrorEnabled()) { - TRACE_LOGGER.error("got a null session, inner channel recreation cannot continue"); + TRACE_LOGGER.error(String.format(Locale.US, "clientId[%s] rro[%s] got a null session, inner channel recreation cannot continue", + this.clientId, this.instanceName)); + } + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + // Inner channel creation failed. + // The next time run() is called should try again. + isOpening = false; } return; } @@ -104,8 +115,8 @@ public void onComplete(Void result) { } if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]", - clientId, sessionName, linkName, endpointAddress)); + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", + clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId())); } } @@ -121,7 +132,7 @@ public void onError(Exception error) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", - clientId, sessionName, linkName, endpointAddress, error)); + clientId, sessionName, linkName, endpointAddress, error.toString())); } } }, @@ -132,8 +143,8 @@ public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", - clientId, sessionName, linkName, endpointAddress)); + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", + clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId())); } } @@ -143,8 +154,8 @@ public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", - clientId, sessionName, linkName, endpointAddress, error)); + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s], error %s", + clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId(), error.toString())); } } }); From 03a950af91db7ecad55343630d610145dafc53f7 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Thu, 17 Jun 2021 13:55:16 -0700 Subject: [PATCH 10/12] Throw away RequestResponseChannel on open error --- .../eventhubs/impl/RequestResponseOpener.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index cc0a454016482..8b1d8e2d2d9f6 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -66,7 +66,7 @@ public synchronized void run(OperationResult // Need to start creating an inner channel. this.isOpening = true; if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel", + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel MARKER2", this.clientId, this.instanceName)); } } @@ -111,12 +111,13 @@ public void onComplete(Void result) { synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { // Inner channel creation complete. - isOpening = false; + RequestResponseOpener.this.isOpening = false; } if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", - clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId())); + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId())); } } @@ -125,14 +126,18 @@ public void onError(Exception error) { operationCallback.onError(error); synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { - // Inner channel creation failed. - // The next time run() is called should try again. - isOpening = false; + // Inner channel creation failed. The next time run() is called should try again. + // Sometimes this.currentChannel ends up in a weird state that shows as OPENING (because + // remote states are UNINITIALIZED) instead of CLOSED/CLOSING, which will still cause the + // next attempt to short-circuit, so null out currentChannel to prevent that. + RequestResponseOpener.this.currentChannel = null; + RequestResponseOpener.this.isOpening = false; } if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", - clientId, sessionName, linkName, endpointAddress, error.toString())); + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, error.getMessage())); } } }, @@ -144,7 +149,8 @@ public void onComplete(Void result) { if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", - clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId())); + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId())); } } @@ -155,7 +161,8 @@ public void onError(Exception error) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s], error %s", - clientId, sessionName, linkName, endpointAddress, requestResponseChannel.getId(), error.toString())); + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId(), error.getMessage())); } } }); From 69d535a25054455403f751102cc3581c7f218fb2 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 22 Jun 2021 10:50:32 -0700 Subject: [PATCH 11/12] Remove extra debugging logs --- .../azure/eventhubs/impl/RequestResponseChannel.java | 6 ------ .../azure/eventhubs/impl/RequestResponseOpener.java | 6 +++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index 98171818d694e..b4313b41a8841 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -197,12 +197,6 @@ public IOObjectState getState() { return IOObjectState.CLOSING; // only left cases are if some are active and some are closed } - public String getStateDebug() { - return String.format(Locale.US, "sendLink local[%s] remote[%s] receiveLink local[%s] remote[%s]", - sendLink.getLocalState().toString(), sendLink.getRemoteState().toString(), - receiveLink.getLocalState().toString(), receiveLink.getRemoteState().toString()); - } - @Override public String getId() { return this.instanceName; diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 8b1d8e2d2d9f6..86c47ce332509 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -46,8 +46,8 @@ public synchronized void run(OperationResult if (this.currentChannel != null) { if ((this.currentChannel.getState() == IOObjectState.OPENED) || (this.currentChannel.getState() == IOObjectState.OPENING)) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate debug [%s]", - this.clientId, this.instanceName, this.currentChannel.getId(), this.currentChannel.getState().toString(), this.currentChannel.getStateDebug())); + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate", + this.clientId, this.instanceName, this.currentChannel.getId(), this.currentChannel.getState().toString())); } return; } @@ -66,7 +66,7 @@ public synchronized void run(OperationResult // Need to start creating an inner channel. this.isOpening = true; if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel MARKER2", + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel", this.clientId, this.instanceName)); } } From bbe31f53b25654a64ef07dc47f0be793e83e6ed2 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 22 Jun 2021 10:58:40 -0700 Subject: [PATCH 12/12] Remove unneeded import --- .../microsoft/azure/eventhubs/impl/RequestResponseChannel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index b4313b41a8841..61efc9eac6361 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -20,7 +20,6 @@ import org.apache.qpid.proton.message.Message; import java.util.HashMap; -import java.util.Locale; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger;