Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing reactor executor when IO pipe is closed. #22192

Merged
merged 7 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
## 2.1.0-beta.1 (Unreleased)

### New Features

- Exposing CbsAuthorizationType.
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
- Delivery outcomes and delivery states are added.

### Bug Fixes

- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
- Fixed a bug where ReactorExecutor did not dispose of its scheduler when "IO Sink was interrupted".

### Dependency Updates

- Upgraded `azure-core` from `1.15.0` to `1.16.0`.
conniey marked this conversation as resolved.
Show resolved Hide resolved
- Upgraded `proton-j` from `0.33.4` to `0.33.8`.
- Upgraded `qpid-proton-j-extensions` from `1.2.3` to `1.2.4`.

## 2.0.6 (2021-05-24)
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}

connection.close();
handler.close();

final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand All @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});

handler.close();
subscriptions.dispose();
}));

Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

// To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
// It will not be kicked off until subscribed to.
final Mono<Void> executorCloseMono = executor.closeAsync();
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executorCloseMono;
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executorCloseMono;
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -14,7 +15,6 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
Expand All @@ -23,7 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
/**
* Schedules the proton-j reactor to continuously run work.
*/
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable {

/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
* #close()} is called.
* #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
Expand Down Expand Up @@ -142,10 +145,6 @@ private void run() {
}
}

Mono<Void> isClosed() {
return isClosedMono.asMono();
}

/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
Expand Down Expand Up @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}