Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for EH receiver timeout while opening #21324

Merged
merged 19 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6bd7dab
Fix race on reopening CBSChannel
JamesBirdsall May 12, 2021
f393c42
Avoid more races
JamesBirdsall May 12, 2021
7fc1efa
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall May 28, 2021
982cf9d
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall Jun 1, 2021
145bc97
Fix maximumSilentTime validation issue hitting Spark
JamesBirdsall Jun 1, 2021
f23c5fd
Roll track1 vers to 3.3.0-beta.1
JamesBirdsall Jun 1, 2021
72f218a
Pom typo
JamesBirdsall Jun 1, 2021
743ef65
Revert version changes to avoid polluting fix PR
JamesBirdsall Jun 2, 2021
9275a17
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall Jun 2, 2021
5214c2e
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall Jun 8, 2021
08c91fd
Improve and simplify race condition fix
JamesBirdsall Jun 8, 2021
ede1025
More logging
JamesBirdsall Jun 14, 2021
adc0bc4
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall Jun 14, 2021
ba41489
Merge branch 'master' of github.com:azure/azure-sdk-for-java into tim…
JamesBirdsall Jun 16, 2021
f392207
Improve tracing to track down race issue
JamesBirdsall Jun 16, 2021
03a950a
Throw away RequestResponseChannel on open error
JamesBirdsall Jun 17, 2021
e2293e7
Merge branch 'main' of github.com:azure/azure-sdk-for-java into timeo…
JamesBirdsall Jun 22, 2021
69d535a
Remove extra debugging logs
JamesBirdsall Jun 22, 2021
bbe31f5
Remove unneeded import
JamesBirdsall Jun 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void onEvent() {
"FaultTolerantObject[%s] client[%s] session[%s] decided to create cNIO[%s] innerObject[%s] iOstate[%s]",
FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName,
FaultTolerantObject.this.creatingNewInnerObject ? "T" : "F",
FaultTolerantObject.this.innerObject != null ? "not null" : "null",
FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getId() : "null",
FaultTolerantObject.this.innerObject != null ? FaultTolerantObject.this.innerObject.getState().toString() : "--"));
}
shouldCreateNewInnerObject = true;
Expand All @@ -95,8 +95,9 @@ public void onComplete(T result) {
}

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object creation complete",
FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName));
TRACE_LOGGER.info(String.format(Locale.US, "FaultTolerantObject[%s] client[%s] session[%s] inner object rrc[%s] creation complete",
FaultTolerantObject.this.instanceName, FaultTolerantObject.this.clientId, FaultTolerantObject.this.sessionName,
result.getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public interface IOObject {
// should be run on reactor thread
IOObjectState getState();

String getId();

enum IOObjectState {
OPENING,
OPENED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class RequestResponseChannel implements IOObject {
private final AtomicInteger openRefCount;
private final AtomicInteger closeRefCount;

private final String instanceName = StringUtil.getRandomString("RRC");

private OperationResult<Void, Exception> onOpen;
private OperationResult<Void, Exception> onClose; // handles closeLink due to failures
private OperationResult<Void, Exception> onGraceFullClose; // handles intentional close
Expand Down Expand Up @@ -194,6 +196,11 @@ public IOObjectState getState() {
return IOObjectState.CLOSING; // only left cases are if some are active and some are closed
}

@Override
public String getId() {
return this.instanceName;
}

private class RequestHandler implements AmqpSender {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Locale;
import java.util.concurrent.ScheduledExecutorService;

import com.microsoft.azure.eventhubs.impl.IOObject.IOObjectState;

public class RequestResponseOpener implements Operation<RequestResponseChannel> {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseOpener.class);

Expand All @@ -21,7 +23,11 @@ public class RequestResponseOpener implements Operation<RequestResponseChannel>
private final AmqpConnection eventDispatcher;
private final ScheduledExecutorService executor;

private boolean isOpened;
private final String instanceName = StringUtil.getRandomString("RRO");

private RequestResponseChannel currentChannel = null;
private final Object isOpenedSynchronizer = new Object();
private volatile boolean isOpening = false;

public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName,
final String endpointAddress, final AmqpConnection eventDispatcher, final ScheduledExecutorService executor) {
Expand All @@ -36,8 +42,33 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String

@Override
public synchronized void run(OperationResult<RequestResponseChannel, Exception> operationCallback) {
if (this.isOpened) {
return;
synchronized (this.isOpenedSynchronizer) {
if (this.currentChannel != null) {
if ((this.currentChannel.getState() == IOObjectState.OPENED) || (this.currentChannel.getState() == IOObjectState.OPENING)) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel rrc[%s] currently [%s], no need to recreate",
this.clientId, this.instanceName, this.currentChannel.getId(), this.currentChannel.getState().toString()));
}
return;
}
}

// Inner channel doesn't exist or it is closing/closed. Do we need to start creation of a new one,
// or is that already in progress?
if (this.isOpening) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] inner channel creation already in progress",
this.clientId, this.instanceName));
}
return;
}

// Need to start creating an inner channel.
this.isOpening = true;
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s] rro[%s] opening inner channel",
this.clientId, this.instanceName));
}
}

final Session session = this.sessionProvider.getSession(
Expand All @@ -52,14 +83,23 @@ public synchronized void run(OperationResult<RequestResponseChannel, Exception>
});

if (session == null) {
if (TRACE_LOGGER.isErrorEnabled()) {
TRACE_LOGGER.error(String.format(Locale.US, "clientId[%s] rro[%s] got a null session, inner channel recreation cannot continue",
this.clientId, this.instanceName));
}
synchronized (RequestResponseOpener.this.isOpenedSynchronizer) {
// Inner channel creation failed.
// The next time run() is called should try again.
isOpening = false;
}
return;
}
final RequestResponseChannel requestResponseChannel = new RequestResponseChannel(
this.linkName,
this.endpointAddress,
session,
this.executor);

this.currentChannel = requestResponseChannel;
requestResponseChannel.open(
new OperationResult<Void, Exception>() {
@Override
Expand All @@ -69,21 +109,35 @@ public void onComplete(Void result) {

operationCallback.onComplete(requestResponseChannel);

isOpened = true;
synchronized (RequestResponseOpener.this.isOpenedSynchronizer) {
// Inner channel creation complete.
RequestResponseOpener.this.isOpening = false;
}

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]",
clientId, sessionName, linkName, endpointAddress));
TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]",
RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName,
RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId()));
}
}

@Override
public void onError(Exception error) {
operationCallback.onError(error);

synchronized (RequestResponseOpener.this.isOpenedSynchronizer) {
// Inner channel creation failed. The next time run() is called should try again.
// Sometimes this.currentChannel ends up in a weird state that shows as OPENING (because
// remote states are UNINITIALIZED) instead of CLOSED/CLOSING, which will still cause the
// next attempt to short-circuit, so null out currentChannel to prevent that.
RequestResponseOpener.this.currentChannel = null;
RequestResponseOpener.this.isOpening = false;
}

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s",
clientId, sessionName, linkName, endpointAddress, error));
RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName,
RequestResponseOpener.this.endpointAddress, error.getMessage()));
}
}
},
Expand All @@ -93,11 +147,10 @@ public void onComplete(Void result) {
eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink());
eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink());

isOpened = false;

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]",
clientId, sessionName, linkName, endpointAddress));
TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s]",
RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName,
RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId()));
}
}

Expand All @@ -106,11 +159,10 @@ public void onError(Exception error) {
eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink());
eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink());

isOpened = false;

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s",
clientId, sessionName, linkName, endpointAddress, error));
TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], rrc[%s], error %s",
RequestResponseOpener.this.clientId, RequestResponseOpener.this.sessionName, RequestResponseOpener.this.linkName,
RequestResponseOpener.this.endpointAddress, requestResponseChannel.getId(), error.getMessage()));
}
}
});
Expand Down