From e33e1a03da31c88e4fa7bbaa074fa33ecd4c68ab Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 27 Nov 2024 16:14:57 -0500 Subject: [PATCH] ESQL: async search responses have CCS metadata while searches are running (#117265) ES|QL async search responses now include CCS metadata while the query is still running. The CCS metadata will be present only if a remote cluster is queried and the user requested it with the `include_ccs_metadata: true` setting on the original request to `POST /_query/async`. The setting cannot be modified in the query to `GET /_query/async/:id`. The core change is that the EsqlExecutionInfo object is set on the EsqlQueryTask, which is used for async ES|QL queries, so that calls to `GET /_query/async/:id` have access to the same EsqlExecutionInfo object that is being updated as the planning and query progress. Secondly, the overall `took` time is now always present on ES|QL responses, even for async-searches while the query is still running. The took time shows a "took-so-far" value and will change upon refresh until the query has finished. This is present regardless of the `include_ccs_metadata` setting. Example response showing in progress state of the query: ``` GET _query/async/FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA ``` ```json { "id": "FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA==", "is_running": true, "took": 2032, "columns": [], "values": [], "_clusters": { "total": 3, "successful": 1, "running": 2, "skipped": 0, "partial": 0, "failed": 0, "details": { "(local)": { "status": "running", "indices": "web_traffic", "_shards": { "total": 2, "skipped": 0 } }, "remote1": { "status": "running", "indices": "web_traffic" }, "remote2": { "status": "successful", "indices": "web_traffic", "took": 180, "_shards": { "total": 2, "successful": 2, "skipped": 0, "failed": 0 } } } } } ``` --- docs/changelog/117265.yaml | 5 + .../esql/action/CrossClusterAsyncQueryIT.java | 522 ++++++++++++++++++ .../esql/action/CrossClustersQueryIT.java | 9 +- .../xpack/esql/action/EsqlExecutionInfo.java | 13 +- .../xpack/esql/action/EsqlQueryResponse.java | 7 +- .../xpack/esql/action/EsqlQueryTask.java | 13 +- .../xpack/esql/plugin/ComputeListener.java | 29 +- .../xpack/esql/plugin/ComputeService.java | 26 +- .../esql/plugin/TransportEsqlQueryAction.java | 23 +- .../xpack/esql/session/EsqlSession.java | 1 + .../esql/action/EsqlQueryResponseTests.java | 3 +- .../esql/plugin/ComputeListenerTests.java | 16 +- 12 files changed, 634 insertions(+), 33 deletions(-) create mode 100644 docs/changelog/117265.yaml create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java diff --git a/docs/changelog/117265.yaml b/docs/changelog/117265.yaml new file mode 100644 index 0000000000000..ec6605155538d --- /dev/null +++ b/docs/changelog/117265.yaml @@ -0,0 +1,5 @@ +pr: 117265 +summary: Async search responses have CCS metadata while searches are running +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java new file mode 100644 index 0000000000000..440582dcfbb45 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -0,0 +1,522 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.mapper.OnScriptError; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.script.LongFieldScript; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; + +public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase { + + private static final String REMOTE_CLUSTER_1 = "cluster-a"; + private static final String REMOTE_CLUSTER_2 = "remote-b"; + private static String LOCAL_INDEX = "logs-1"; + private static String REMOTE_INDEX = "logs-2"; + private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking"; + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean()); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPlugin.class); + plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action + plugins.add(InternalExchangePlugin.class); + plugins.add(PauseFieldPlugin.class); + return plugins; + } + + public static class InternalExchangePlugin extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.timeSetting( + ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, + TimeValue.timeValueSeconds(30), + Setting.Property.NodeScope + ) + ); + } + } + + @Before + public void resetPlugin() { + PauseFieldPlugin.allowEmitting = new CountDownLatch(1); + PauseFieldPlugin.startEmitting = new CountDownLatch(1); + } + + public static class PauseFieldPlugin extends Plugin implements ScriptPlugin { + public static CountDownLatch startEmitting = new CountDownLatch(1); + public static CountDownLatch allowEmitting = new CountDownLatch(1); + + @Override + public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { + return new ScriptEngine() { + @Override + + public String getType() { + return "pause"; + } + + @Override + @SuppressWarnings("unchecked") + public FactoryType compile( + String name, + String code, + ScriptContext context, + Map params + ) { + if (context == LongFieldScript.CONTEXT) { + return (FactoryType) new LongFieldScript.Factory() { + @Override + public LongFieldScript.LeafFactory newFactory( + String fieldName, + Map params, + SearchLookup searchLookup, + OnScriptError onScriptError + ) { + return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) { + @Override + public void execute() { + startEmitting.countDown(); + try { + assertTrue(allowEmitting.await(30, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + emit(1); + } + }; + } + }; + } + throw new IllegalStateException("unsupported type " + context); + } + + @Override + public Set> getSupportedContexts() { + return Set.of(LongFieldScript.CONTEXT); + } + }; + } + } + + /** + * Includes testing for CCS metadata in the GET /_query/async/:id response while the search is still running + */ + public void testSuccessfulPathways() throws Exception { + Map testClusterInfo = setupClusters(3); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); + int remote2NumShards = (Integer) testClusterInfo.get("remote2.blocking_index.num_shards"); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + AtomicReference asyncExecutionId = new AtomicReference<>(); + + String q = "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(const) | LIMIT 10"; + try (EsqlQueryResponse resp = runAsyncQuery(q, requestIncludeMeta, null, TimeValue.timeValueMillis(100))) { + assertTrue(resp.isRunning()); + assertNotNull("async execution id is null", resp.asyncExecutionId()); + asyncExecutionId.set(resp.asyncExecutionId().get()); + // executionInfo may or may not be set on the initial response when there is a relatively low wait_for_completion_timeout + // so we do not check for it here + } + + // wait until we know that the query against 'remote-b:blocking' has started + PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it) + assertBusy(() -> { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster("cluster-a"); + assertThat(clusterA.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING))); + } + }); + + /* at this point: + * the query against cluster-a should be finished + * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown) + * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b + */ + try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertThat(asyncResponse.isRunning(), is(true)); + assertThat( + executionInfo.clusterAliases(), + equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) + ); + assertThat(executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING), equalTo(2)); + assertThat(executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL), equalTo(1)); + + EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(clusterA.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterA.getTotalShards(), greaterThanOrEqualTo(1)); + assertThat(clusterA.getSuccessfulShards(), equalTo(clusterA.getTotalShards())); + assertThat(clusterA.getSkippedShards(), equalTo(0)); + assertThat(clusterA.getFailedShards(), equalTo(0)); + assertThat(clusterA.getFailures().size(), equalTo(0)); + assertThat(clusterA.getTook().millis(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + // should still be RUNNING since the local cluster has to do a STATS on the coordinator, waiting on remoteB + assertThat(local.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertThat(clusterA.getTotalShards(), greaterThanOrEqualTo(1)); + + EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2); + // should still be RUNNING since we haven't released the countdown lock to proceed + assertThat(remoteB.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertNull(remoteB.getSuccessfulShards()); // should not be filled in until query is finished + + assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); + } + + // allow remoteB query to proceed + PauseFieldPlugin.allowEmitting.countDown(); + + // wait until both remoteB and local queries have finished + assertBusy(() -> { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remoteB.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING))); + EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(local.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING))); + assertThat(asyncResponse.isRunning(), is(false)); + } + }); + + try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(1L)); + + EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(clusterA.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterA.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterA.getTotalShards(), equalTo(remote1NumShards)); + assertThat(clusterA.getSuccessfulShards(), equalTo(remote1NumShards)); + assertThat(clusterA.getSkippedShards(), equalTo(0)); + assertThat(clusterA.getFailedShards(), equalTo(0)); + assertThat(clusterA.getFailures().size(), equalTo(0)); + + EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remoteB.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteB.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remoteB.getTotalShards(), equalTo(remote2NumShards)); + assertThat(remoteB.getSuccessfulShards(), equalTo(remote2NumShards)); + assertThat(remoteB.getSkippedShards(), equalTo(0)); + assertThat(remoteB.getFailedShards(), equalTo(0)); + assertThat(remoteB.getFailures().size(), equalTo(0)); + + EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(local.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(local.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(local.getTotalShards(), equalTo(localNumShards)); + assertThat(local.getSuccessfulShards(), equalTo(localNumShards)); + assertThat(local.getSkippedShards(), equalTo(0)); + assertThat(local.getFailedShards(), equalTo(0)); + assertThat(local.getFailures().size(), equalTo(0)); + } finally { + AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId.get()); + assertThat(acknowledgedResponse.isAcknowledged(), is(true)); + } + } + + public void testAsyncQueriesWithLimit0() throws IOException { + setupClusters(3); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + final TimeValue waitForCompletion = TimeValue.timeValueNanos(randomFrom(1L, Long.MAX_VALUE)); + String asyncExecutionId = null; + try (EsqlQueryResponse resp = runAsyncQuery("FROM logs*,*:logs* | LIMIT 0", requestIncludeMeta, null, waitForCompletion)) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + if (resp.isRunning()) { + asyncExecutionId = resp.asyncExecutionId().get(); + assertThat(resp.columns().size(), equalTo(0)); + assertThat(resp.values().hasNext(), is(false)); // values should be empty list + + } else { + assertThat(resp.columns().size(), equalTo(4)); + assertThat(resp.columns().contains(new ColumnInfoImpl("const", "long")), is(true)); + assertThat(resp.columns().contains(new ColumnInfoImpl("id", "keyword")), is(true)); + assertThat(resp.columns().contains(new ColumnInfoImpl("tag", "keyword")), is(true)); + assertThat(resp.columns().contains(new ColumnInfoImpl("v", "long")), is(true)); + assertThat(resp.values().hasNext(), is(false)); // values should be empty list + + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs*")); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + assertClusterMetadataInResponse(resp, responseExpectMeta, 3); + } + } finally { + if (asyncExecutionId != null) { + AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId); + assertThat(acknowledgedResponse.isAcknowledged(), is(true)); + } + } + } + + protected EsqlQueryResponse runAsyncQuery(String query, Boolean ccsMetadata, QueryBuilder filter, TimeValue waitCompletionTime) { + EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadata != null) { + request.includeCCSMetadata(ccsMetadata); + } + request.waitForCompletionTimeout(waitCompletionTime); + request.keepOnCompletion(false); + if (filter != null) { + request.filter(filter); + } + return runAsyncQuery(request); + } + + protected EsqlQueryResponse runAsyncQuery(EsqlQueryRequest request) { + try { + return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } catch (ElasticsearchTimeoutException e) { + throw new AssertionError("timeout waiting for query response", e); + } + } + + AcknowledgedResponse deleteAsyncId(String id) { + try { + DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id); + return client().execute(TransportDeleteAsyncResultAction.TYPE, request).actionGet(30, TimeUnit.SECONDS); + } catch (ElasticsearchTimeoutException e) { + throw new AssertionError("timeout waiting for DELETE response", e); + } + } + + EsqlQueryResponse getAsyncResponse(String id) { + try { + var getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(timeValueMillis(1)); + return client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet(30, TimeUnit.SECONDS); + } catch (ElasticsearchTimeoutException e) { + throw new AssertionError("timeout waiting for GET async result", e); + } + } + + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta, int numClusters) { + try { + final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); + final Object clusters = esqlResponseAsMap.get("_clusters"); + if (responseExpectMeta) { + assertNotNull(clusters); + // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) + @SuppressWarnings("unchecked") + Map inner = (Map) clusters; + assertTrue(inner.containsKey("total")); + assertThat((int) inner.get("total"), equalTo(numClusters)); + assertTrue(inner.containsKey("details")); + } else { + assertNull(clusters); + } + } catch (IOException e) { + fail("Could not convert ESQLQueryResponse to Map: " + e); + } + } + + /** + * v1: value to send to runQuery (can be null; null means use default value) + * v2: whether to expect CCS Metadata in the response (cannot be null) + * @return + */ + public static Tuple randomIncludeCCSMetadata() { + return switch (randomIntBetween(1, 3)) { + case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); + case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); + case 3 -> new Tuple<>(null, Boolean.FALSE); + default -> throw new AssertionError("should not get here"); + }; + } + + Map setupClusters(int numClusters) throws IOException { + assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote1.num_shards", numShardsRemote); + clusterInfo.put("remote1.index", REMOTE_INDEX); + + if (numClusters == 3) { + int numShardsRemote2 = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2); + populateRemoteIndicesWithRuntimeMapping(REMOTE_CLUSTER_2); + clusterInfo.put("remote2.index", REMOTE_INDEX); + clusterInfo.put("remote2.num_shards", numShardsRemote2); + clusterInfo.put("remote2.blocking_index", INDEX_WITH_RUNTIME_MAPPING); + clusterInfo.put("remote2.blocking_index.num_shards", 1); + } + + String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1); + Setting skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey); + boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService() + .getClusterSettings() + .get(skipUnavailableSetting); + clusterInfo.put("remote.skip_unavailable", skipUnavailable); + + return clusterInfo; + } + + void populateLocalIndices(String indexName, int numShards) { + Client localClient = client(LOCAL_CLUSTER); + assertAcked( + localClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") + ); + for (int i = 0; i < 10; i++) { + localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + } + localClient.admin().indices().prepareRefresh(indexName).get(); + } + + void populateRemoteIndicesWithRuntimeMapping(String clusterAlias) throws IOException { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", "pause").endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(clusterAlias).admin().indices().prepareCreate(INDEX_WITH_RUNTIME_MAPPING).setMapping(mapping).get(); + BulkRequestBuilder bulk = client(clusterAlias).prepareBulk(INDEX_WITH_RUNTIME_MAPPING) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < 10; i++) { + bulk.add(new IndexRequest().source("foo", i)); + } + bulk.get(); + } + + void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); + } + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 6801e1f4eb404..596c70e57ccd6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -61,6 +61,10 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER_1 = "cluster-a"; private static final String REMOTE_CLUSTER_2 = "remote-b"; + private static String LOCAL_INDEX = "logs-1"; + private static String IDX_ALIAS = "alias1"; + private static String FILTERED_IDX_ALIAS = "alias-filtered-1"; + private static String REMOTE_INDEX = "logs-2"; @Override protected Collection remoteClusterAlias() { @@ -1278,11 +1282,6 @@ Map setupTwoClusters() { return setupClusters(2); } - private static String LOCAL_INDEX = "logs-1"; - private static String IDX_ALIAS = "alias1"; - private static String FILTERED_IDX_ALIAS = "alias-filtered-1"; - private static String REMOTE_INDEX = "logs-2"; - Map setupClusters(int numClusters) { assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; int numShardsLocal = randomIntBetween(1, 5); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 80bb2afe57122..ba7a7e8266845 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -169,6 +169,17 @@ public TimeValue overallTook() { return overallTook; } + /** + * How much time the query took since starting. + */ + public TimeValue tookSoFar() { + if (relativeStartNanos == null) { + return new TimeValue(0); + } else { + return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); + } + } + public Set clusterAliases() { return clusterInfo.keySet(); } @@ -478,7 +489,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString()); builder.field(INDICES_FIELD.getPreferredName(), indexExpression); - if (took != null) { + if (took != null && status != Status.RUNNING) { builder.field(TOOK.getPreferredName(), took.millis()); } if (totalShards != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 4e59d5419fe6f..77aed298baea5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -196,8 +196,11 @@ public Iterator toXContentChunked(ToXContent.Params params } b.field("is_running", isRunning); } - if (executionInfo != null && executionInfo.overallTook() != null) { - b.field("took", executionInfo.overallTook().millis()); + if (executionInfo != null) { + long tookInMillis = executionInfo.overallTook() == null + ? executionInfo.tookSoFar().millis() + : executionInfo.overallTook().millis(); + b.field("took", tookInMillis); } if (dropNullColumns) { b.append(ResponseXContentUtils.allColumns(columns, "all_columns")) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index b12cf4eb354bf..f896a25317102 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -17,6 +17,8 @@ public class EsqlQueryTask extends StoredAsyncTask { + private EsqlExecutionInfo executionInfo; + public EsqlQueryTask( long id, String type, @@ -29,10 +31,19 @@ public EsqlQueryTask( TimeValue keepAlive ) { super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive); + this.executionInfo = null; + } + + public void setExecutionInfo(EsqlExecutionInfo executionInfo) { + this.executionInfo = executionInfo; + } + + public EsqlExecutionInfo executionInfo() { + return executionInfo; } @Override public EsqlQueryResponse getCurrentResult() { - return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, null); + return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 49af4a593e6e5..8d041ffbdf0e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -112,6 +112,7 @@ private ComputeListener( if (runningOnRemoteCluster()) { // for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and // received by the acquireCompute method callback on the coordinating cluster + setFinalStatusAndShardCounts(clusterAlias, executionInfo); EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias); result = new ComputeResponse( collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(), @@ -126,19 +127,33 @@ private ComputeListener( if (coordinatingClusterIsSearchedInCCS()) { // if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all // data nodes have finished processing - executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> { - if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { - return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build(); - } else { - return v; - } - }); + setFinalStatusAndShardCounts(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, executionInfo); } } delegate.onResponse(result); }, e -> delegate.onFailure(failureCollector.getFailure()))); } + private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + // TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed + if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v; + return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) + /* + * Total and skipped shard counts are set early in execution (after can-match). + * Until ES|QL supports shard-level partial results, we just set all non-skipped shards + * as successful and none are failed. + */ + .setSuccessfulShards(v.getTotalShards()) + .setFailedShards(0) + .build(); + } else { + return v; + } + }); + } + /** * @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 6a0d1bf9bb035..73266551f169c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -178,6 +178,7 @@ public void execute( null ); String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + updateShardCountForCoordinatorOnlyQuery(execInfo); try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> { updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo); @@ -260,6 +261,22 @@ public void execute( } } + // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) + private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { + if (execInfo.isCrossClusterSearch()) { + for (String clusterAlias : execInfo.clusterAliases()) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + } + // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements @@ -267,11 +284,7 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null"; for (String clusterAlias : execInfo.clusterAliases()) { execInfo.swapCluster(clusterAlias, (k, v) -> { - var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook()) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0); + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook()); if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); } @@ -324,9 +337,8 @@ private void startComputeOnDataNodes( executionInfo.swapCluster( clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(dataNodeResult.totalShards()) - .setSuccessfulShards(dataNodeResult.totalShards()) + // do not set successful or failed shard count here - do it when search is done .setSkippedShards(dataNodeResult.skippedShards()) - .setFailedShards(0) .build() ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index fdc6e06a11032..76bfb95d07926 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -151,6 +151,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener @Override public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { + // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running + task.setExecutionInfo(createEsqlExecutionInfo(request)); ActionListener.run(listener, l -> innerExecute(task, request, l)); } @@ -170,10 +172,9 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener remoteClusterService.isSkipUnavailable(clusterAlias), - request.includeCCSMetadata() - ); + // async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task + // sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one + EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request); PlanRunner planRunner = (plan, resultListener) -> computeService.execute( sessionId, (CancellableTask) task, @@ -194,6 +195,18 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata()); + } + private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) { List columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList(); EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null; @@ -269,7 +282,7 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) { asyncExecutionId, true, // is_running true, // isAsync - null + task.executionInfo() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 8f65914d1c30d..021596c31f65d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -147,6 +147,7 @@ public String sessionId() { * Execute an ESQL request. */ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener listener) { + assert executionInfo != null : "Null EsqlExecutionInfo"; LOGGER.debug("ESQL query:\n{}", request.query()); analyzedPlan( parse(request.query(), request.params()), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 35364089127cc..f7b402b909732 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -519,14 +519,15 @@ static EsqlQueryResponse fromXContent(XContentParser parser) { } public void testChunkResponseSizeColumnar() { - int sizeClusterDetails = 14; try (EsqlQueryResponse resp = randomResponse(true, null)) { + int sizeClusterDetails = 14; int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize); } try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) { + int sizeClusterDetails = resp.isRunning() ? 13 : 14; // overall took time not present when is_running=true int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize); // is_running diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 625cb5628d039..b606f99df437c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -353,10 +353,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() { assertThat(response.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(executionInfo.getCluster(remoteAlias).getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(executionInfo.getCluster(remoteAlias).getTook(), equalTo(response.getTook())); - - // the status in the (remote) executionInfo will still be RUNNING, since the SUCCESSFUL status gets set on the querying - // cluster executionInfo in the acquireCompute CCS listener, NOT present in this test - see testCollectComputeResultsInCCSListener - assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); Mockito.verifyNoInteractions(transportService.getTaskManager()); } @@ -376,6 +373,17 @@ public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() { // fully filled in for cross-cluster searches executionInfo.swapCluster(localCluster, (k, v) -> new EsqlExecutionInfo.Cluster(localCluster, "logs*", false)); executionInfo.swapCluster("my_remote", (k, v) -> new EsqlExecutionInfo.Cluster("my_remote", "my_remote:logs*", false)); + + // before acquire-compute, can-match (SearchShards) runs filling in total shards and skipped shards, so simulate that here + executionInfo.swapCluster( + localCluster, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build() + ); + executionInfo.swapCluster( + "my_remote", + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build() + ); + try ( ComputeListener computeListener = ComputeListener.create( // whereRunning=localCluster simulates running on the querying cluster