Skip to content

Commit

Permalink
Synchronize WriteReplicaResult callbacks (#36770)
Browse files Browse the repository at this point in the history
TransportWriteAction.WriteReplicaResult is not properly synchronized, which can lead to a data race
between the thread that calls respond and the AsyncAfterWriteAction that calls either onSuccess or
onFailure. This data race results in the response listener not being called, which ultimately results in
a stuck replication task on the replica.
  • Loading branch information
ywelsch authored Dec 18, 2018
1 parent 1fa1056 commit e35de2e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public synchronized void respond(ActionListener<Response> listener) {
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
Expand Down Expand Up @@ -206,7 +207,7 @@ public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
}

@Override
public void respond(ActionListener<TransportResponse.Empty> listener) {
public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
this.listener = listener;
respondIfPossible(null);
}
Expand All @@ -215,6 +216,7 @@ public void respond(ActionListener<TransportResponse.Empty> listener) {
* Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
*/
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
if (finishedAsyncActions && listener != null) {
if (ex == null) {
super.respond(listener);
Expand All @@ -225,7 +227,7 @@ protected void respondIfPossible(Exception ex) {
}

@Override
public void onFailure(Exception ex) {
public synchronized void onFailure(Exception ex) {
finishedAsyncActions = true;
respondIfPossible(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -347,6 +350,51 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
}
}

public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
IndexShard replica = mock(IndexShard.class);
when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
TestRequest request = new TestRequest();
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
request, new Translog.Location(0, 0, 0), null, replica, logger);
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable waitForBarrier = () -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
};
CountDownLatch completionLatch = new CountDownLatch(1);
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.respond(new ActionListener<TransportResponse.Empty>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
completionLatch.countDown();
}

@Override
public void onFailure(Exception e) {
completionLatch.countDown();
}
});
});
if (randomBoolean()) {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onFailure(null);
});
} else {
threadPool.generic().execute(() -> {
waitForBarrier.run();
replicaResult.onSuccess(false);
});
}

assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
}

private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {

private final boolean withDocumentFailureOnPrimary;
Expand Down

0 comments on commit e35de2e

Please sign in to comment.