diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index b988d2ab7296c..2649da6959205 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; @@ -63,6 +64,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction listener + ActionListener listener, + TimeValue timeout ) { - ActionListener checkPointInfoListener = ActionListener.wrap(infoBuilder -> { - if (context.getChangesLastDetectedAt() != null) { - infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); - } - if (context.getLastSearchTime() != null) { - infoBuilder.setLastSearchTime(context.getLastSearchTime()); - } - listener.onResponse(infoBuilder.build()); - }, listener::onFailure); + ActionListener checkPointInfoListener = ListenerTimeouts.wrapWithTimeout( + threadPool, + timeout, + threadPool.generic(), + ActionListener.wrap(infoBuilder -> { + if (context.getChangesLastDetectedAt() != null) { + infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); + } + if (context.getLastSearchTime() != null) { + infoBuilder.setLastSearchTime(context.getLastSearchTime()); + } + listener.onResponse(infoBuilder.build()); + }, listener::onFailure), + (ignore) -> listener.onFailure( + new ElasticsearchTimeoutException(format("Timed out retrieving checkpointing info after [%s]", timeout)) + ) + ); + // TODO: pass `timeout` to the lower layers ClientTransformIndexer transformIndexer = getIndexer(); if (transformIndexer == null) { transformsCheckpointService.getCheckpointingInfo(