Skip to content

Commit

Permalink
Revert "Fix ListenableFuture Resolving Listeners under Mutex (#71943) (
Browse files Browse the repository at this point in the history
…#71963)"

This reverts commit 2fdcc9d.
  • Loading branch information
jtibshirani committed Apr 21, 2021
1 parent 3c5f461 commit dc55aff
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action;

import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;

Expand Down Expand Up @@ -104,7 +105,7 @@ public Response result() {
* Registers the given listener to be notified with the result of this step.
*/
public void addListener(ActionListener<Response> listener) {
delegate.addListener(listener);
delegate.addListener(listener, EsExecutors.newDirectExecutorService());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
}
}
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}, listener::onFailure), threadPool.generic(), null);
}, listener::onFailure), threadPool.generic());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -1457,7 +1458,7 @@ public void onFailure(Exception e) {
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, null, transportService.getThreadPool().getThreadContext());
}, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext());
}

private void cancelTimeoutHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
Expand All @@ -30,33 +30,31 @@
public final class ListenableFuture<V> extends BaseFuture<V> implements ActionListener<V> {

private volatile boolean done = false;
private List<Tuple<ActionListener<V>, ExecutorService>> listeners;
private final List<Tuple<ActionListener<V>, ExecutorService>> listeners = new ArrayList<>();


/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception on the thread completing this future.
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*/
public void addListener(ActionListener<V> listener) {
addListener(listener, null, null);
public void addListener(ActionListener<V> listener, ExecutorService executor) {
addListener(listener, executor, null);
}

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided
* if one is provided. If a null executor is provided the listener will be executed directly
* on the thread completing the future.
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*
* It will apply the provided ThreadContext (if not null) when executing the listening.
*/
public void addListener(ActionListener<V> listener, @Nullable ExecutorService executor, ThreadContext threadContext) {
assert executor != EsExecutors.newDirectExecutorService() : "using direct executor here instead of null is needless overhead";
public void addListener(ActionListener<V> listener, ExecutorService executor, ThreadContext threadContext) {
if (done) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListenerDirectly(listener);
notifyListener(listener, EsExecutors.newDirectExecutorService());
} else {
final boolean run;
// check done under lock since it could have been modified and protect modifications
Expand All @@ -71,60 +69,36 @@ public void addListener(ActionListener<V> listener, @Nullable ExecutorService ex
} else {
wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
}
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(new Tuple<>(wrappedListener, executor));
run = false;
}
}

if (run) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListenerDirectly(listener);
notifyListener(listener, EsExecutors.newDirectExecutorService());
}
}
}

@Override
protected void done(boolean ignored) {
final List<Tuple<ActionListener<V>, ExecutorService>> existingListeners;
synchronized (this) {
done = true;
existingListeners = listeners;
if (existingListeners == null) {
return;
}
listeners = null;
}
for (Tuple<ActionListener<V>, ExecutorService> t : existingListeners) {
final ExecutorService executorService = t.v2();
final ActionListener<V> listener = t.v1();
if (executorService == null) {
notifyListenerDirectly(listener);
} else {
notifyListener(listener, executorService);
}
}
}

private void notifyListenerDirectly(ActionListener<V> listener) {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
protected synchronized void done(boolean ignored) {
done = true;
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
// release references to any listeners as we no longer need them and will live
// much longer than the listeners in most cases
listeners.clear();
}

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.execute(new Runnable() {
executorService.execute(new ActionRunnable<V>(listener) {
@Override
public void run() {
notifyListenerDirectly(listener);
protected void doRun() {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;

Expand Down Expand Up @@ -40,7 +41,7 @@ public synchronized ActionListener<Void> markReceivedAndCreateListener(long requ
if (checkpointTracker.hasProcessed(requestSeqNo)) {
final ListenableFuture<Void> existingFuture = ongoingRequests.get(requestSeqNo);
if (existingFuture != null) {
existingFuture.addListener(listener);
existingFuture.addListener(listener, EsExecutors.newDirectExecutorService());
} else {
listener.onResponse(null);
}
Expand All @@ -52,7 +53,7 @@ public synchronized ActionListener<Void> markReceivedAndCreateListener(long requ
future.addListener(listener.delegateFailure((l, v) -> {
ongoingRequests.remove(requestSeqNo);
l.onResponse(v);
}));
}), EsExecutors.newDirectExecutorService());
return future;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -129,7 +130,7 @@ public StartRecoveryRequest getRequest() {
}

public void addListener(ActionListener<RecoveryResponse> listener) {
future.addListener(listener);
future.addListener(listener, EsExecutors.newDirectExecutorService());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -114,14 +115,14 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
if (existingListener != null) {
try {
// wait on previous entry to complete connection attempt
existingListener.addListener(listener);
existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
} finally {
connectingRefCounter.decRef();
}
return;
}

currentListener.addListener(listener);
currentListener.addListener(listener, EsExecutors.newDirectExecutorService());

final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testListenableFutureNotifiesListeners() {
AtomicInteger notifications = new AtomicInteger(0);
final int numberOfListeners = scaledRandomIntBetween(1, 12);
for (int i = 0; i < numberOfListeners; i++) {
future.addListener(ActionListener.wrap(notifications::incrementAndGet), null, threadContext);
future.addListener(ActionListener.wrap(notifications::incrementAndGet), EsExecutors.newDirectExecutorService(), threadContext);
}

future.onResponse("");
Expand All @@ -56,7 +56,7 @@ public void testListenableFutureNotifiesListenersOnException() {
future.addListener(ActionListener.wrap(s -> fail("this should never be called"), e -> {
assertEquals(exception, e);
notifications.incrementAndGet();
}), null, threadContext);
}), EsExecutors.newDirectExecutorService(), threadContext);
}

future.onFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.watcher.FileChangesListener;
Expand Down Expand Up @@ -808,7 +809,7 @@ void triggerReload(ActionListener<Void> toNotify) {
future = reloadFutureRef.get();
}
}
future.addListener(toNotify);
future.addListener(toNotify, EsExecutors.newDirectExecutorService(), null);
}

void reloadAsync(final ListenableFuture<Void> future) {
Expand Down

0 comments on commit dc55aff

Please sign in to comment.