Skip to content

Commit

Permalink
ESQL: async search responses have CCS metadata while searches are run…
Browse files Browse the repository at this point in the history
…ning (elastic#117265)

ES|QL async search responses now include CCS metadata while the query is still running.
The CCS metadata will be present only if a remote cluster is queried and the user requested
it with the `include_ccs_metadata: true` setting on the original request to `POST /_query/async`.
The setting cannot be modified in the query to `GET /_query/async/:id`.

The core change is that the EsqlExecutionInfo object is set on the EsqlQueryTask, which is used
for async ES|QL queries, so that calls to `GET /_query/async/:id` have access to the same
EsqlExecutionInfo object that is being updated as the planning and query progress.

Secondly, the overall `took` time is now always present on ES|QL responses, even for
async-searches while the query is still running. The took time shows a "took-so-far" value
and will change upon refresh until the query has finished. This is present regardless of
the `include_ccs_metadata` setting.

Example response showing in progress state of the query:

```
GET _query/async/FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA
```

```json
{
  "id": "FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA==",
  "is_running": true,
  "took": 2032,
  "columns": [],
  "values": [],
  "_clusters": {
    "total": 3,
    "successful": 1,
    "running": 2,
    "skipped": 0,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "running",
        "indices": "web_traffic",
        "_shards": {
          "total": 2,
          "skipped": 0
        }
      },
      "remote1": {
        "status": "running",
        "indices": "web_traffic"
      },
      "remote2": {
        "status": "successful",
        "indices": "web_traffic",
        "took": 180,
        "_shards": {
          "total": 2,
          "successful": 2,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}
```
  • Loading branch information
quux00 committed Nov 27, 2024
1 parent 5094eea commit a2a4a56
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117265.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117265
summary: Async search responses have CCS metadata while searches are running
area: ES|QL
type: enhancement
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER_1 = "cluster-a";
private static final String REMOTE_CLUSTER_2 = "remote-b";
private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";

@Override
protected Collection<String> remoteClusterAlias() {
Expand Down Expand Up @@ -1278,11 +1282,6 @@ Map<String, Object> setupTwoClusters() {
return setupClusters(2);
}

private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";

Map<String, Object> setupClusters(int numClusters) {
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
int numShardsLocal = randomIntBetween(1, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ public TimeValue overallTook() {
return overallTook;
}

/**
* How much time the query took since starting.
*/
public TimeValue tookSoFar() {
if (relativeStartNanos == null) {
return new TimeValue(0);
} else {
return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}
}

public Set<String> clusterAliases() {
return clusterInfo.keySet();
}
Expand Down Expand Up @@ -478,7 +489,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
{
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) {
if (took != null && status != Status.RUNNING) {
builder.field(TOOK.getPreferredName(), took.millis());
}
if (totalShards != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,11 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}
b.field("is_running", isRunning);
}
if (executionInfo != null && executionInfo.overallTook() != null) {
b.field("took", executionInfo.overallTook().millis());
if (executionInfo != null) {
long tookInMillis = executionInfo.overallTook() == null
? executionInfo.tookSoFar().millis()
: executionInfo.overallTook().millis();
b.field("took", tookInMillis);
}
if (dropNullColumns) {
b.append(ResponseXContentUtils.allColumns(columns, "all_columns"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {

private EsqlExecutionInfo executionInfo;

public EsqlQueryTask(
long id,
String type,
Expand All @@ -29,10 +31,19 @@ public EsqlQueryTask(
TimeValue keepAlive
) {
super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive);
this.executionInfo = null;
}

public void setExecutionInfo(EsqlExecutionInfo executionInfo) {
this.executionInfo = executionInfo;
}

public EsqlExecutionInfo executionInfo() {
return executionInfo;
}

@Override
public EsqlQueryResponse getCurrentResult() {
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, null);
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private ComputeListener(
if (runningOnRemoteCluster()) {
// for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and
// received by the acquireCompute method callback on the coordinating cluster
setFinalStatusAndShardCounts(clusterAlias, executionInfo);
EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias);
result = new ComputeResponse(
collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(),
Expand All @@ -126,19 +127,33 @@ private ComputeListener(
if (coordinatingClusterIsSearchedInCCS()) {
// if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
// data nodes have finished processing
executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
} else {
return v;
}
});
setFinalStatusAndShardCounts(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, executionInfo);
}
}
delegate.onResponse(result);
}, e -> delegate.onFailure(failureCollector.getFailure())));
}

private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
// TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v;
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
/*
* Total and skipped shard counts are set early in execution (after can-match).
* Until ES|QL supports shard-level partial results, we just set all non-skipped shards
* as successful and none are failed.
*/
.setSuccessfulShards(v.getTotalShards())
.setFailedShards(0)
.build();
} else {
return v;
}
});
}

/**
* @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void execute(
null
);
String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
updateShardCountForCoordinatorOnlyQuery(execInfo);
try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
Expand Down Expand Up @@ -260,18 +261,30 @@ public void execute(
}
}

// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
if (execInfo.isCrossClusterSearch()) {
for (String clusterAlias : execInfo.clusterAliases()) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}
}

// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
if (execInfo.isCrossClusterSearch()) {
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
for (String clusterAlias : execInfo.clusterAliases()) {
execInfo.swapCluster(clusterAlias, (k, v) -> {
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook());
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
}
Expand Down Expand Up @@ -324,9 +337,8 @@ private void startComputeOnDataNodes(
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(dataNodeResult.totalShards())
.setSuccessfulShards(dataNodeResult.totalShards())
// do not set successful or failed shard count here - do it when search is done
.setSkippedShards(dataNodeResult.skippedShards())
.setFailedShards(0)
.build()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener

@Override
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

Expand All @@ -170,10 +172,9 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
System.nanoTime()
);
String sessionId = sessionID(task);
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
request.includeCCSMetadata()
);
// async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
// sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
Expand All @@ -194,6 +195,18 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
);
}

private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {
if (task instanceof EsqlQueryTask esqlQueryTask && esqlQueryTask.executionInfo() != null) {
return esqlQueryTask.executionInfo();
} else {
return createEsqlExecutionInfo(request);
}
}

private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata());
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
Expand Down Expand Up @@ -269,7 +282,7 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
asyncExecutionId,
true, // is_running
true, // isAsync
null
task.executionInfo()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
analyzedPlan(
parse(request.query(), request.params()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,15 @@ static EsqlQueryResponse fromXContent(XContentParser parser) {
}

public void testChunkResponseSizeColumnar() {
int sizeClusterDetails = 14;
try (EsqlQueryResponse resp = randomResponse(true, null)) {
int sizeClusterDetails = 14;
int columnCount = resp.pages().get(0).getBlockCount();
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize);
}

try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
int sizeClusterDetails = resp.isRunning() ? 13 : 14; // overall took time not present when is_running=true
int columnCount = resp.pages().get(0).getBlockCount();
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize); // is_running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() {
assertThat(response.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.getCluster(remoteAlias).getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.getCluster(remoteAlias).getTook(), equalTo(response.getTook()));

// the status in the (remote) executionInfo will still be RUNNING, since the SUCCESSFUL status gets set on the querying
// cluster executionInfo in the acquireCompute CCS listener, NOT present in this test - see testCollectComputeResultsInCCSListener
assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));

Mockito.verifyNoInteractions(transportService.getTaskManager());
}
Expand All @@ -376,6 +373,17 @@ public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() {
// fully filled in for cross-cluster searches
executionInfo.swapCluster(localCluster, (k, v) -> new EsqlExecutionInfo.Cluster(localCluster, "logs*", false));
executionInfo.swapCluster("my_remote", (k, v) -> new EsqlExecutionInfo.Cluster("my_remote", "my_remote:logs*", false));

// before acquire-compute, can-match (SearchShards) runs filling in total shards and skipped shards, so simulate that here
executionInfo.swapCluster(
localCluster,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
);
executionInfo.swapCluster(
"my_remote",
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
);

try (
ComputeListener computeListener = ComputeListener.create(
// whereRunning=localCluster simulates running on the querying cluster
Expand Down

0 comments on commit a2a4a56

Please sign in to comment.