From 7335011e51b993341a2c6efed5bdaff37cec6a41 Mon Sep 17 00:00:00 2001 From: JamesBirdsall Date: Tue, 22 Jun 2021 16:11:31 -0700 Subject: [PATCH] Fix for EH receiver timeout while opening (#21324) * Fix race on reopening CBSChannel * Throw away RequestResponseChannel on open error to avoid new issue introduced by race fix * Add and improve tracing --- .../eventhubs/impl/FaultTolerantObject.java | 7 +- .../azure/eventhubs/impl/IOObject.java | 2 + .../impl/RequestResponseChannel.java | 7 ++ .../eventhubs/impl/RequestResponseOpener.java | 84 +++++++++++++++---- 4 files changed, 81 insertions(+), 19 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java index 78c14b75ae615..286d48a4d10e1 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java @@ -71,7 +71,7 @@ public void onEvent() { "FaultTolerantObject[%s] client[%s] session[%s] decided to create cNIO[%s] innerObject[%s] iOstate[%s]", FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName, FaultTolerantObject.this.creatingNewInnerObject ? "T" : "F", - FaultTolerantObject.this.innerObject != null ? "not null" : "null", + FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getId() : "null", FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getState().toString() : "--")); } shouldCreateNewInnerObject = true; @@ -95,8 +95,9 @@ public void onComplete(T result) { } if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object creation complete", - FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName)); + TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object rrc[%s] creation complete", + FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName, + result.getId())); } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java index 9126fab989533..6d7f884138f2e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java @@ -8,6 +8,8 @@ public interface IOObject { // should be run on reactor thread IOObjectState getState(); + String getId(); + enum IOObjectState { OPENING, OPENED, diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index 6621f82d662c7..61efc9eac6361 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -37,6 +37,8 @@ public class RequestResponseChannel implements IOObject { private final AtomicInteger openRefCount; private final AtomicInteger closeRefCount; + private final String instanceName = StringUtil.getRandomString("RRC"); + private OperationResult onOpen; private OperationResult onClose; // handles closeLink due to failures private OperationResult onGraceFullClose; // handles intentional close @@ -194,6 +196,11 @@ public IOObjectState getState() { return IOObjectState.CLOSING; // only left cases are if some are active and some are closed } + @Override + public String getId() { + return this.instanceName; + } + private class RequestHandler implements AmqpSender { @Override diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index 17625b3a2295d..86c47ce332509 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -10,6 +10,8 @@ import java.util.Locale; import java.util.concurrent.ScheduledExecutorService; +import com.microsoft.azure.eventhubs.impl.IOObject.IOObjectState; + public class RequestResponseOpener implements Operation { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseOpener.class); @@ -21,7 +23,11 @@ public class RequestResponseOpener implements Operation private final AmqpConnection eventDispatcher; private final ScheduledExecutorService executor; - private boolean isOpened; + private final String instanceName = StringUtil.getRandomString("RRO"); + + private RequestResponseChannel currentChannel = null; + private final Object isOpenedSynchronizer = new Object(); + private volatile boolean isOpening = false; public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, final String endpointAddress, final AmqpConnection eventDispatcher, final ScheduledExecutorService executor) { @@ -36,8 +42,33 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String @Override public synchronized void run(OperationResult operationCallback) { - if (this.isOpened) { - return; + synchronized (this.isOpenedSynchronizer) { + if (this.currentChannel != null) { + if ((this.currentChannel.getState() == IOObjectState.OPENED) || (this.currentChannel.getState() == IOObjectState.OPENING)) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate", + this.clientId, this.instanceName, this.currentChannel.getId(), this.currentChannel.getState().toString())); + } + return; + } + } + + // Inner channel doesn't exist or it is closing/closed. Do we need to start creation of a new one, + // or is that already in progress? + if (this.isOpening) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel creation already in progress", + this.clientId, this.instanceName)); + } + return; + } + + // Need to start creating an inner channel. + this.isOpening = true; + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel", + this.clientId, this.instanceName)); + } } final Session session = this.sessionProvider.getSession( @@ -52,6 +83,15 @@ public synchronized void run(OperationResult }); if (session == null) { + if (TRACE_LOGGER.isErrorEnabled()) { + TRACE_LOGGER.error(String.format(Locale.US, "clientId[%s] rro[%s] got a null session, inner channel recreation cannot continue", + this.clientId, this.instanceName)); + } + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + // Inner channel creation failed. + // The next time run() is called should try again. + isOpening = false; + } return; } final RequestResponseChannel requestResponseChannel = new RequestResponseChannel( @@ -59,7 +99,7 @@ public synchronized void run(OperationResult this.endpointAddress, session, this.executor); - + this.currentChannel = requestResponseChannel; requestResponseChannel.open( new OperationResult() { @Override @@ -69,11 +109,15 @@ public void onComplete(Void result) { operationCallback.onComplete(requestResponseChannel); - isOpened = true; + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + // Inner channel creation complete. + RequestResponseOpener.this.isOpening = false; + } if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]", - clientId, sessionName, linkName, endpointAddress)); + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId())); } } @@ -81,9 +125,19 @@ public void onComplete(Void result) { public void onError(Exception error) { operationCallback.onError(error); + synchronized (RequestResponseOpener.this.isOpenedSynchronizer) { + // Inner channel creation failed. The next time run() is called should try again. + // Sometimes this.currentChannel ends up in a weird state that shows as OPENING (because + // remote states are UNINITIALIZED) instead of CLOSED/CLOSING, which will still cause the + // next attempt to short-circuit, so null out currentChannel to prevent that. + RequestResponseOpener.this.currentChannel = null; + RequestResponseOpener.this.isOpening = false; + } + if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", - clientId, sessionName, linkName, endpointAddress, error)); + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, error.getMessage())); } } }, @@ -93,11 +147,10 @@ public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - isOpened = false; - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", - clientId, sessionName, linkName, endpointAddress)); + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]", + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId())); } } @@ -106,11 +159,10 @@ public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); - isOpened = false; - if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", - clientId, sessionName, linkName, endpointAddress, error)); + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s], error %s", + RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName, + RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId(), error.getMessage())); } } });