Skip to content

Commit

Permalink
Ignore cancellation exceptions (elastic#117657) (elastic#118169)
Browse files Browse the repository at this point in the history
Today, when an ES|QL task encounters an exception, we trigger a 
cancellation on the root task, causing child tasks to fail due to
cancellation. We chose not to include cancellation exceptions in the 
output, as they are unhelpful and add noise during problem analysis.
However, these exceptions are still slipping through via
RefCountingListener. This change addresses the issue by introducing
ESQLRefCountingListener, ensuring that no cancellation exceptions are
returned.
  • Loading branch information
dnhatn authored Dec 6, 2024
1 parent e4b0f8a commit 25fd1be
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 42 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117657.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117657
summary: Ignore cancellation exceptions
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.core.Releasable;

/**
* Similar to {@link org.elasticsearch.action.support.RefCountingListener},
* but prefers non-task-cancelled exceptions over task-cancelled ones as they are more useful for diagnosing issues.
* @see FailureCollector
*/
public final class EsqlRefCountingListener implements Releasable {
private final FailureCollector failureCollector;
private final RefCountingRunnable refs;

public EsqlRefCountingListener(ActionListener<Void> delegate) {
this.failureCollector = new FailureCollector();
this.refs = new RefCountingRunnable(() -> {
Exception error = failureCollector.getFailure();
if (error != null) {
delegate.onFailure(error);
} else {
delegate.onResponse(null);
}
});
}

public ActionListener<Void> acquire() {
return refs.acquireListener().delegateResponse((l, e) -> {
failureCollector.unwrapAndCollect(e);
l.onFailure(e);
});
}

@Override
public void close() {
refs.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Semaphore;

/**
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
Expand All @@ -26,12 +25,11 @@
*/
public final class FailureCollector {
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();
private final Semaphore cancelledExceptionsPermits;

private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();
private final Semaphore nonCancelledExceptionsPermits;

private final int maxExceptions;
private volatile boolean hasFailure = false;
private Exception finalFailure = null;

Expand All @@ -43,7 +41,8 @@ public FailureCollector(int maxExceptions) {
if (maxExceptions <= 0) {
throw new IllegalArgumentException("maxExceptions must be at least one");
}
this.maxExceptions = maxExceptions;
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
}

private static Exception unwrapTransportException(TransportException te) {
Expand All @@ -60,13 +59,12 @@ private static Exception unwrapTransportException(TransportException te) {
public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
cancelledExceptions.add(e);
}
} else {
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
nonCancelledExceptions.add(e);
}
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
nonCancelledExceptions.add(e);
cancelledExceptions.clear();
}
hasFailure = true;
}
Expand Down Expand Up @@ -99,20 +97,22 @@ public Exception getFailure() {
private Exception buildFailure() {
assert hasFailure;
assert Thread.holdsLock(this);
int total = 0;
Exception first = null;
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
for (Exception e : exceptions) {
if (first == null) {
first = e;
total++;
} else if (first != e) {
first.addSuppressed(e);
total++;
}
if (total >= maxExceptions) {
return first;
}
for (Exception e : nonCancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
if (first != null) {
return first;
}
for (Exception e : cancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
assert first != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.IsBlockedResult;
Expand Down Expand Up @@ -54,20 +55,20 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
final Exception e = failure.getFailure();
if (e != null) {
l.onFailure(e);
listener.onFailure(e);
} else {
l.onResponse(null);
listener.onResponse(null);
}
});
try (RefCountingListener refs = new RefCountingListener(listener)) {
})) {
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
pending.trackNewInstance();
pending.completion.addListener(refs.acquire());
pending.completion.addListener(refs.acquireListener());
pending.finishInstance();
}
}
Expand Down Expand Up @@ -269,7 +270,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
try (RefCountingListener refs = new RefCountingListener(sinkListener)) {
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
fetcher.fetchPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -86,6 +87,14 @@ public void testCollect() throws Exception {
assertNotNull(failure);
assertThat(failure, Matchers.in(nonCancelledExceptions));
assertThat(failure.getSuppressed().length, lessThan(maxExceptions));
assertTrue(
"cancellation exceptions must be ignored",
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TaskCancelledException).isEmpty()
);
assertTrue(
"remote transport exception must be unwrapped",
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TransportException).isEmpty()
);
}

public void testEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -30,7 +31,9 @@
import org.elasticsearch.geo.ShapeTestUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
Expand Down Expand Up @@ -129,6 +132,8 @@
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.PATTERN;
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public final class EsqlTestUtils {

Expand Down Expand Up @@ -784,4 +789,17 @@ public static QueryParam paramAsIdentifier(String name, Object value) {
public static QueryParam paramAsPattern(String name, Object value) {
return new QueryParam(name, value, NULL, PATTERN);
}

/**
* Asserts that:
* 1. Cancellation exceptions are ignored when more relevant exceptions exist.
* 2. Transport exceptions are unwrapped, and the actual causes are reported to users.
*/
public static void assertEsqlFailure(Exception e) {
assertNotNull(e);
var cancellationFailure = ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof TaskCancelledException).orElse(null);
assertNull("cancellation exceptions must be ignored", cancellationFailure);
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info("request failed", e);
EsqlTestUtils.assertEsqlFailure(e);
ensureBlocksReleased();
} finally {
setRequestCircuitBreakerLimit(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -85,6 +86,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
} catch (Exception e) {
logger.info("request failed", e);
ensureBlocksReleased();
EsqlTestUtils.assertEsqlFailure(e);
throw e;
} finally {
setRequestCircuitBreakerLimit(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before;

Expand Down Expand Up @@ -338,7 +339,15 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
*/
assertThat(
cancelException.getMessage(),
in(List.of("test cancel", "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]"))
in(
List.of(
"test cancel",
"task cancelled",
"request cancelled test cancel",
"parent task was cancelled [test cancel]",
"cancelled on failure"
)
)
);
assertBusy(
() -> assertThat(
Expand Down Expand Up @@ -434,6 +443,7 @@ protected void doRun() throws Exception {
allowedFetching.countDown();
}
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
EsqlTestUtils.assertEsqlFailure(failure);
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
// If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
// As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -111,6 +112,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
ensureBlocksReleased();
logger.info("--> failed to execute esql query with disruption; retrying...", e);
EsqlTestUtils.assertEsqlFailure(e);
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -39,8 +39,7 @@
final class ComputeListener implements Releasable {
private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);

private final RefCountingListener refs;
private final FailureCollector failureCollector = new FailureCollector();
private final EsqlRefCountingListener refs;
private final AtomicBoolean cancelled = new AtomicBoolean();
private final CancellableTask task;
private final TransportService transportService;
Expand Down Expand Up @@ -105,7 +104,7 @@ private ComputeListener(
: "clusterAlias and executionInfo must both be null or both non-null";

// listener that executes after all the sub-listeners refs (created via acquireCompute) have completed
this.refs = new RefCountingListener(1, ActionListener.wrap(ignored -> {
this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> {
responseHeaders.finish();
ComputeResponse result;

Expand All @@ -131,7 +130,7 @@ private ComputeListener(
}
}
delegate.onResponse(result);
}, e -> delegate.onFailure(failureCollector.getFailure())));
}));
}

private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
Expand Down Expand Up @@ -191,7 +190,6 @@ private boolean isCCSListener(String computeClusterAlias) {
*/
ActionListener<Void> acquireAvoid() {
return refs.acquire().delegateResponse((l, e) -> {
failureCollector.unwrapAndCollect(e);
try {
if (cancelled.compareAndSet(false, true)) {
LOGGER.debug("cancelling ESQL task {} on failure", task);
Expand Down
Loading

0 comments on commit 25fd1be

Please sign in to comment.