Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix subscription bugs in ReactorSession and ReactorConnection #22085

Merged
merged 5 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## 2.1.0-beta.1 (Unreleased)

### New Features
- Exposing CbsAuthorizationType.

### Bug Fixes
conniey marked this conversation as resolved.
Show resolved Hide resolved
- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.

## 2.0.6 (2021-05-24)
### Bug Fixes
- Fixed a bug that caused amqp connection not to retry when network error happened.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
if (isDisposed.getAndSet(true)) {
logger.verbose("connectionId[{}] was already disposed. {}", connectionId, message);
} else {
dispose(new AmqpShutdownSignal(false, false, message));
closeAsync(new AmqpShutdownSignal(false, false, message)).subscribe();
}
});

Expand All @@ -133,7 +133,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
.onErrorResume(error -> {
if (!isDisposed.getAndSet(true)) {
logger.verbose("connectionId[{}]: Disposing of active sessions due to error.", connectionId);
return dispose(new AmqpShutdownSignal(false, false,
return closeAsync(new AmqpShutdownSignal(false, false,
error.getMessage())).then(Mono.error(error));
} else {
return Mono.error(error);
Expand All @@ -144,7 +144,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
logger.verbose("connectionId[{}]: Disposing of active sessions due to connection close.",
connectionId);

dispose(new AmqpShutdownSignal(false, false,
closeAsync(new AmqpShutdownSignal(false, false,
"Connection handler closed.")).subscribe();
}
})
Expand Down Expand Up @@ -310,7 +310,7 @@ public void dispose() {
// Because the reactor executor schedules the pending close after the timeout, we want to give sufficient time
// for the rest of the tasks to run.
final Duration timeout = operationTimeout.plus(operationTimeout);
dispose(new AmqpShutdownSignal(false, true, "Disposed by client."))
closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client."))
.publishOn(Schedulers.boundedElastic())
.block(timeout);
}
Expand Down Expand Up @@ -356,7 +356,7 @@ protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChan
new ClientLogger(RequestResponseChannel.class + ":" + entityPath)));
}

Mono<Void> dispose(AmqpShutdownSignal shutdownSignal) {
Mono<Void> closeAsync(AmqpShutdownSignal shutdownSignal) {
logger.info("connectionId[{}] signal[{}]: Disposing of ReactorConnection.", connectionId, shutdownSignal);

if (cbsChannelProcessor != null) {
Expand Down Expand Up @@ -494,7 +494,7 @@ public void onConnectionError(Throwable exception) {
if (!isDisposed.getAndSet(true)) {
logger.verbose("onReactorError connectionId[{}], hostName[{}]: Disposing.", connectionId,
getFullyQualifiedNamespace());
dispose(new AmqpShutdownSignal(false, false,
closeAsync(new AmqpShutdownSignal(false, false,
"onReactorError: " + exception.toString()))
.subscribe();
}
Expand All @@ -508,7 +508,7 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {

if (!isDisposed.getAndSet(true)) {
logger.verbose("onConnectionShutdown connectionId[{}], hostName[{}]: disposing.");
dispose(shutdownSignal).subscribe();
closeAsync(shutdownSignal).subscribe();
}
}
}
Expand All @@ -533,7 +533,7 @@ private void dispose() {
}

if (session instanceof ReactorSession) {
((ReactorSession) session).dispose("Closing session.", null, true)
((ReactorSession) session).closeAsync("Closing session.", null, true)
.subscribe();
} else {
session.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHan
connectionSubscriptions = Disposables.composite(
this.endpointStates.subscribe(),

shutdownSignals.flatMap(signal -> dispose("Shutdown signal received", null, false)).subscribe());
shutdownSignals.flatMap(signal -> closeAsync("Shutdown signal received", null, false)).subscribe());
conniey marked this conversation as resolved.
Show resolved Hide resolved

session.open();
}
Expand All @@ -152,7 +152,7 @@ public boolean isDisposed() {
*/
@Override
public void dispose() {
dispose("Dispose called.", null, true)
closeAsync("Dispose called.", null, true)
.block(retryOptions.getTryTimeout());
}

Expand Down Expand Up @@ -240,7 +240,7 @@ Mono<Void> isClosed() {
return isClosedMono.asMono();
}

Mono<Void> dispose(String message, ErrorCondition errorCondition, boolean disposeLinks) {
Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean disposeLinks) {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}
Expand Down Expand Up @@ -596,7 +596,7 @@ private void handleClose() {
"connectionId[{}] sessionName[{}] Disposing of active send and receive links due to session close.",
sessionHandler.getConnectionId(), sessionName);

dispose("", null, true);
closeAsync("", null, true).subscribe();
}

private void handleError(Throwable error) {
Expand All @@ -610,12 +610,12 @@ private void handleError(Throwable error) {

condition = new ErrorCondition(Symbol.getSymbol(errorCondition), exception.getMessage());

dispose(exception.getMessage(), condition, true);
closeAsync(exception.getMessage(), condition, true).subscribe();
} else {
condition = null;
}

dispose(error.getMessage(), condition, true);
closeAsync(error.getMessage(), condition, true).subscribe();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,12 @@ void disposeAsync() throws IOException {
connection2.getReactorConnection().subscribe();

// Act and Assert
StepVerifier.create(connection2.dispose(signal))
StepVerifier.create(connection2.closeAsync(signal))
.verifyComplete();

assertTrue(connection2.isDisposed());

StepVerifier.create(connection2.dispose(signal))
StepVerifier.create(connection2.closeAsync(signal))
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class ManagementChannel implements EventHubManagementNode {

//@formatter:off
this.subscription = responseChannelMono
.flatMapMany(e -> e.getEndpointStates().distinct())
.flatMapMany(e -> e.getEndpointStates().distinctUntilChanged())
.subscribe(e -> {
logger.info("Management endpoint state: {}", e);
endpointStateSink.next(e);
Expand Down Expand Up @@ -159,17 +159,17 @@ private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> respo
request.setApplicationProperties(applicationProperties);

return channelMono.flatMap(channel -> channel.sendWithAck(request)
.map(message -> {
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
return messageSerializer.deserialize(message, responseType);
sink.next(messageSerializer.deserialize(message, responseType));
}

final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
final String statusDescription = RequestResponseUtils.getStatusDescription(message);
final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
statusDescription, channel.getErrorContext());

throw logger.logExceptionAsWarning(Exceptions.propagate(error));
sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
}));
});
}
Expand Down