Skip to content

Commit

Permalink
Treat ack timeout more like a publish timeout (#31303)
Browse files Browse the repository at this point in the history
This commit changes the ack timeout mechanism so that its behavior is closer to the publish
timeout, i.e., it only comes into play after committing a cluster state. This ensures for example that
an index creation request with a low (ack) timeout value does not return before the cluster state
that contains information about the newly created index is even committed.
  • Loading branch information
ywelsch committed Jun 14, 2018
1 parent 8305332 commit 3e76b15
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -365,28 +364,11 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
}

public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();

//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
nonFailedTasks.stream().filter(task -> task.listener instanceof AckedClusterStateTaskListener).forEach(task -> {
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
ackedListener.onAckTimeout();
} else {
try {
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),
threadPool));
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedListener.onAckTimeout();
}
}
});

return new DelegatingAckListener(ackListeners);
return new DelegatingAckListener(nonFailedTasks.stream()
.filter(task -> task.listener instanceof AckedClusterStateTaskListener)
.map(task -> new AckCountDownListener((AckedClusterStateTaskListener) task.listener, newClusterState.version(),
newClusterState.nodes(), threadPool))
.collect(Collectors.toList()));
}

public boolean clusterStateUnchanged() {
Expand Down Expand Up @@ -549,6 +531,13 @@ private DelegatingAckListener(List<Discovery.AckListener> listeners) {
this.listeners = listeners;
}

@Override
public void onCommit(TimeValue commitTime) {
for (Discovery.AckListener listener : listeners) {
listener.onCommit(commitTime);
}
}

@Override
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
for (Discovery.AckListener listener : listeners) {
Expand All @@ -564,14 +553,16 @@ private static class AckCountDownListener implements Discovery.AckListener {
private final AckedClusterStateTaskListener ackedTaskListener;
private final CountDown countDown;
private final DiscoveryNode masterNode;
private final ThreadPool threadPool;
private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback;
private volatile Future<?> ackTimeoutCallback;
private Exception lastFailure;

AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes,
ThreadPool threadPool) {
this.ackedTaskListener = ackedTaskListener;
this.clusterStateVersion = clusterStateVersion;
this.threadPool = threadPool;
this.masterNode = nodes.getMasterNode();
int countDown = 0;
for (DiscoveryNode node : nodes) {
Expand All @@ -581,8 +572,27 @@ private static class AckCountDownListener implements Discovery.AckListener {
}
}
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout());
this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called
}

@Override
public void onCommit(TimeValue commitTime) {
TimeValue ackTimeout = ackedTaskListener.ackTimeout();
if (ackTimeout == null) {
ackTimeout = TimeValue.ZERO;
}
final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
if (timeLeft.nanos() == 0L) {
onTimeout();
} else if (countDown.countDown()) {
finish();
} else {
this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout);
// re-check if onNodeAck has not completed while we were scheduling the timeout
if (countDown.isCountedDown()) {
FutureUtils.cancel(ackTimeoutCallback);
}
}
}

@Override
Expand All @@ -599,12 +609,16 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
}

if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
FutureUtils.cancel(ackTimeoutCallback);
ackedTaskListener.onAllNodesAcked(lastFailure);
finish();
}
}

private void finish() {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
FutureUtils.cancel(ackTimeoutCallback);
ackedTaskListener.onAllNodesAcked(lastFailure);
}

public void onTimeout() {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/discovery/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -48,6 +49,19 @@ public interface Discovery extends LifecycleComponent {
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);

interface AckListener {
/**
* Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails,
* it is guaranteed to appear in future publications).
* @param commitTime the time it took to commit the cluster state
*/
void onCommit(TimeValue commitTime);

/**
* Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied
* the cluster state. In case of failures, an exception should be provided as parameter.
* @param node the node
* @param e the optional exception
*/
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -61,6 +62,7 @@ public SingleNodeDiscovery(final Settings settings, final TransportService trans
public synchronized void publish(final ClusterChangedEvent event,
final AckListener ackListener) {
clusterState = event.state();
ackListener.onCommit(TimeValue.ZERO);
CountDownLatch latch = new CountDownLatch(1);

ClusterApplyListener listener = new ClusterApplyListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public void publish(final ClusterChangedEvent clusterChangedEvent, final int min
}

try {
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs);
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
serializedDiffs);
} catch (Discovery.FailedToCommitClusterStateException t) {
throw t;
} catch (Exception e) {
Expand All @@ -173,8 +174,9 @@ public void publish(final ClusterChangedEvent clusterChangedEvent, final int min
}

private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
final SendingController sendingController, final boolean sendFullVersion,
final Map<Version, BytesReference> serializedStates, final Map<Version, BytesReference> serializedDiffs) {
final SendingController sendingController, final Discovery.AckListener ackListener,
final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
final Map<Version, BytesReference> serializedDiffs) {

final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
Expand All @@ -195,8 +197,12 @@ private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final S

sendingController.waitForCommit(discoverySettings.getCommitTimeout());

final long commitTime = System.nanoTime() - publishingStartInNanos;

ackListener.onCommit(TimeValue.timeValueNanos(commitTime));

try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -39,6 +40,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -65,6 +67,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -680,6 +683,132 @@ public void onFailure(String source, Exception e) {
mockAppender.assertAllExpectationsMatched();
}

public void testAcking() throws InterruptedException {
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name",
MasterServiceTests.class.getSimpleName()).build(), threadPool);
ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(DiscoveryNodes.builder()
.add(node1)
.add(node2)
.add(node3)
.localNodeId(node1.getId())
.masterNodeId(node1.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
final AtomicReference<BiConsumer<ClusterChangedEvent, Discovery.AckListener>> publisherRef = new AtomicReference<>();
timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l));
timedMasterService.setClusterStateSupplier(() -> initialClusterState);
timedMasterService.start();


// check that we don't time out before even committing the cluster state
{
final CountDownLatch latch = new CountDownLatch(1);

publisherRef.set((clusterChangedEvent, ackListener) -> {
throw new Discovery.FailedToCommitClusterStateException("mock exception");
});

timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}

@Override
public TimeValue ackTimeout() {
return TimeValue.ZERO;
}

@Override
public TimeValue timeout() {
return null;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}

@Override
protected Void newResponse(boolean acknowledged) {
fail();
return null;
}

@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}

@Override
public void onAckTimeout() {
fail();
}
});

latch.await();
}

// check that we timeout if commit took too long
{
final CountDownLatch latch = new CountDownLatch(2);

final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));

publisherRef.set((clusterChangedEvent, ackListener) -> {
ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
ackListener.onNodeAck(node1, null);
ackListener.onNodeAck(node2, null);
ackListener.onNodeAck(node3, null);
});

timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}

@Override
public TimeValue ackTimeout() {
return ackTimeout;
}

@Override
public TimeValue timeout() {
return null;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
protected Void newResponse(boolean acknowledged) {
fail();
return null;
}

@Override
public void onFailure(String source, Exception e) {
fail();
}

@Override
public void onAckTimeout() {
latch.countDown();
}
});

latch.await();
}

timedMasterService.close();
}

static class TimedMasterService extends MasterService {

public volatile Long currentTimeOverride = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -815,9 +816,16 @@ public AssertingAckListener publishState(PublishClusterStateAction action, Clust
public static class AssertingAckListener implements Discovery.AckListener {
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
private final CountDownLatch countDown;
private final CountDownLatch commitCountDown;

public AssertingAckListener(int nodeCount) {
countDown = new CountDownLatch(nodeCount);
commitCountDown = new CountDownLatch(1);
}

@Override
public void onCommit(TimeValue commitTime) {
commitCountDown.countDown();
}

@Override
Expand All @@ -830,6 +838,7 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {

public void await(long timeout, TimeUnit unit) throws InterruptedException {
assertThat(awaitErrors(timeout, unit), emptyIterable());
assertTrue(commitCountDown.await(timeout, unit));
}

public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {
Expand Down

0 comments on commit 3e76b15

Please sign in to comment.