From da08fd5a8ca007b0c1f730df289ea2d0332f0ae2 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 4 Jun 2021 11:35:16 -0700 Subject: [PATCH 1/5] Fix error where Mono for dispose was not being subscribed to. --- .../amqp/implementation/ReactorConnection.java | 14 +++++++------- .../amqp/implementation/ReactorConnectionTest.java | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 416286ace2002..2f5ccb8961812 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -120,7 +120,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption if (isDisposed.getAndSet(true)) { logger.verbose("connectionId[{}] was already disposed. {}", connectionId, message); } else { - dispose(new AmqpShutdownSignal(false, false, message)); + closeAsync(new AmqpShutdownSignal(false, false, message)).subscribe(); } }); @@ -133,7 +133,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption .onErrorResume(error -> { if (!isDisposed.getAndSet(true)) { logger.verbose("connectionId[{}]: Disposing of active sessions due to error.", connectionId); - return dispose(new AmqpShutdownSignal(false, false, + return closeAsync(new AmqpShutdownSignal(false, false, error.getMessage())).then(Mono.error(error)); } else { return Mono.error(error); @@ -144,7 +144,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption logger.verbose("connectionId[{}]: Disposing of active sessions due to connection close.", connectionId); - dispose(new AmqpShutdownSignal(false, false, + closeAsync(new AmqpShutdownSignal(false, false, "Connection handler closed.")).subscribe(); } }) @@ -310,7 +310,7 @@ public void dispose() { // Because the reactor executor schedules the pending close after the timeout, we want to give sufficient time // for the rest of the tasks to run. final Duration timeout = operationTimeout.plus(operationTimeout); - dispose(new AmqpShutdownSignal(false, true, "Disposed by client.")) + closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client.")) .publishOn(Schedulers.boundedElastic()) .block(timeout); } @@ -356,7 +356,7 @@ protected AmqpChannelProcessor createRequestResponseChan new ClientLogger(RequestResponseChannel.class + ":" + entityPath))); } - Mono dispose(AmqpShutdownSignal shutdownSignal) { + Mono closeAsync(AmqpShutdownSignal shutdownSignal) { logger.info("connectionId[{}] signal[{}]: Disposing of ReactorConnection.", connectionId, shutdownSignal); if (cbsChannelProcessor != null) { @@ -494,7 +494,7 @@ public void onConnectionError(Throwable exception) { if (!isDisposed.getAndSet(true)) { logger.verbose("onReactorError connectionId[{}], hostName[{}]: Disposing.", connectionId, getFullyQualifiedNamespace()); - dispose(new AmqpShutdownSignal(false, false, + closeAsync(new AmqpShutdownSignal(false, false, "onReactorError: " + exception.toString())) .subscribe(); } @@ -508,7 +508,7 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) { if (!isDisposed.getAndSet(true)) { logger.verbose("onConnectionShutdown connectionId[{}], hostName[{}]: disposing."); - dispose(shutdownSignal).subscribe(); + closeAsync(shutdownSignal).subscribe(); } } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 6f2083faeeec5..e2016cca09980 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -680,12 +680,12 @@ void disposeAsync() throws IOException { connection2.getReactorConnection().subscribe(); // Act and Assert - StepVerifier.create(connection2.dispose(signal)) + StepVerifier.create(connection2.closeAsync(signal)) .verifyComplete(); assertTrue(connection2.isDisposed()); - StepVerifier.create(connection2.dispose(signal)) + StepVerifier.create(connection2.closeAsync(signal)) .verifyComplete(); } From ea5afea767250b827c9ea8421e2ca1a277d1fafd Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 4 Jun 2021 11:36:10 -0700 Subject: [PATCH 2/5] Fix error where close operations were not being subscribed to. --- .../core/amqp/implementation/ReactorConnection.java | 2 +- .../core/amqp/implementation/ReactorSession.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 2f5ccb8961812..0c38ed2b5d3cc 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -533,7 +533,7 @@ private void dispose() { } if (session instanceof ReactorSession) { - ((ReactorSession) session).dispose("Closing session.", null, true) + ((ReactorSession) session).closeAsync("Closing session.", null, true) .subscribe(); } else { session.dispose(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 8ec8656e9eee8..5d58e2bd75922 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -128,7 +128,7 @@ public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHan connectionSubscriptions = Disposables.composite( this.endpointStates.subscribe(), - shutdownSignals.flatMap(signal -> dispose("Shutdown signal received", null, false)).subscribe()); + shutdownSignals.flatMap(signal -> closeAsync("Shutdown signal received", null, false)).subscribe()); session.open(); } @@ -152,7 +152,7 @@ public boolean isDisposed() { */ @Override public void dispose() { - dispose("Dispose called.", null, true) + closeAsync("Dispose called.", null, true) .block(retryOptions.getTryTimeout()); } @@ -240,7 +240,7 @@ Mono isClosed() { return isClosedMono.asMono(); } - Mono dispose(String message, ErrorCondition errorCondition, boolean disposeLinks) { + Mono closeAsync(String message, ErrorCondition errorCondition, boolean disposeLinks) { if (isDisposed.getAndSet(true)) { return isClosedMono.asMono(); } @@ -596,7 +596,7 @@ private void handleClose() { "connectionId[{}] sessionName[{}] Disposing of active send and receive links due to session close.", sessionHandler.getConnectionId(), sessionName); - dispose("", null, true); + closeAsync("", null, true).subscribe(); } private void handleError(Throwable error) { @@ -610,12 +610,12 @@ private void handleError(Throwable error) { condition = new ErrorCondition(Symbol.getSymbol(errorCondition), exception.getMessage()); - dispose(exception.getMessage(), condition, true); + closeAsync(exception.getMessage(), condition, true).subscribe(); } else { condition = null; } - dispose(error.getMessage(), condition, true); + closeAsync(error.getMessage(), condition, true).subscribe(); } /** From 5796f482031adee5183e0317c883763cc7fe1b72 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 4 Jun 2021 11:38:01 -0700 Subject: [PATCH 3/5] Fixing distinct to distinctUntilChanged --- .../eventhubs/implementation/ManagementChannel.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java index e9a3e18538934..9fd59529d17e4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java @@ -95,7 +95,7 @@ public class ManagementChannel implements EventHubManagementNode { //@formatter:off this.subscription = responseChannelMono - .flatMapMany(e -> e.getEndpointStates().distinct()) + .flatMapMany(e -> e.getEndpointStates().distinctUntilChanged()) .subscribe(e -> { logger.info("Management endpoint state: {}", e); endpointStateSink.next(e); @@ -159,9 +159,9 @@ private Mono getProperties(Map properties, Class respo request.setApplicationProperties(applicationProperties); return channelMono.flatMap(channel -> channel.sendWithAck(request) - .map(message -> { + .handle((message, sink) -> { if (RequestResponseUtils.isSuccessful(message)) { - return messageSerializer.deserialize(message, responseType); + sink.next(messageSerializer.deserialize(message, responseType)); } final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message); @@ -169,7 +169,7 @@ private Mono getProperties(Map properties, Class respo final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(), statusDescription, channel.getErrorContext()); - throw logger.logExceptionAsWarning(Exceptions.propagate(error)); + sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error))); })); }); } From ded348856bf6052413fd5ae4788471cc782484aa Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 4 Jun 2021 11:40:00 -0700 Subject: [PATCH 4/5] Add CHANGELOG --- sdk/core/azure-core-amqp/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 95f3244eaa113..c89760e45461e 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,6 +1,8 @@ # Release History ## 2.1.0-beta.1 (Unreleased) +### Bug Fixes +- Fixed a bug where connection and sessions would not be disposed when their endpoint closed. ## 2.0.6 (2021-05-24) ### Bug Fixes From a1a7c354d15eda5952b21cd294f732f795cece8c Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 4 Jun 2021 11:40:27 -0700 Subject: [PATCH 5/5] Update CHANGELOG with authorization type. --- sdk/core/azure-core-amqp/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index c89760e45461e..db88e271f4f70 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,6 +1,10 @@ # Release History ## 2.1.0-beta.1 (Unreleased) + +### New Features +- Exposing CbsAuthorizationType. + ### Bug Fixes - Fixed a bug where connection and sessions would not be disposed when their endpoint closed.