Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing reactor executor when IO pipe is closed. #22192

Merged
merged 7 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}

connection.close();
handler.close();

final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand All @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});

handler.close();
subscriptions.dispose();
}));

Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,19 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executor.closeAsync();
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executor.closeAsync();
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -23,7 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -51,7 +52,7 @@ class ReactorExecutor implements Closeable {

/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
* #close()} is called.
* #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
Expand Down Expand Up @@ -142,10 +143,6 @@ private void run() {
}
}

Mono<Void> isClosed() {
return isClosedMono.asMono();
}

/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
Expand Down Expand Up @@ -175,26 +172,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}