Skip to content

Commit

Permalink
Added EsqlExecutionInfo to the async query EsqlQueryTask earlier and …
Browse files Browse the repository at this point in the history
…include it in the initial response
  • Loading branch information
quux00 committed Nov 25, 2024
1 parent c1af4a9 commit d2151b1
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

/**
Expand All @@ -63,6 +64,11 @@ public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase {

private static final String REMOTE_CLUSTER_1 = "cluster-a";
private static final String REMOTE_CLUSTER_2 = "remote-b";
private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";
private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking";

@Override
protected Collection<String> remoteClusterAlias() {
Expand Down Expand Up @@ -176,6 +182,8 @@ public void testSuccessfulPathways() throws Exception {
assertTrue(resp.isRunning());
assertNotNull("async execution id is null", resp.asyncExecutionId());
asyncExecutionId.set(resp.asyncExecutionId().get());
// executionInfo may or may not be set on the initial response when there is a relatively low wait_for_completion_timeout
// so we do not check for it here
}

// wait until we know that the query against 'remote-b:blocking' has started
Expand Down Expand Up @@ -281,6 +289,76 @@ public void testSuccessfulPathways() throws Exception {
}
}

public void testAsyncQueriesWithLimit0() throws IOException {
setupClusters(3);
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

final TimeValue waitForCompletion = TimeValue.timeValueNanos(randomFrom(1L, Long.MAX_VALUE));
String asyncExecutionId = null;
try (EsqlQueryResponse resp = runAsyncQuery("FROM logs*,*:logs* | LIMIT 0", requestIncludeMeta, null, waitForCompletion)) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
if (resp.isRunning()) {
asyncExecutionId = resp.asyncExecutionId().get();
assertThat(resp.columns().size(), equalTo(0));
assertThat(resp.values().hasNext(), is(false)); // values should be empty list

} else {
assertThat(resp.columns().size(), equalTo(4));
assertThat(resp.columns().contains(new ColumnInfoImpl("const", "long")), is(true));
assertThat(resp.columns().contains(new ColumnInfoImpl("id", "keyword")), is(true));
assertThat(resp.columns().contains(new ColumnInfoImpl("tag", "keyword")), is(true));
assertThat(resp.columns().contains(new ColumnInfoImpl("v", "long")), is(true));
assertThat(resp.values().hasNext(), is(false)); // values should be empty list

assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(remoteCluster.getTotalShards(), equalTo(0));
assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
assertThat(remoteCluster.getSkippedShards(), equalTo(0));
assertThat(remoteCluster.getFailedShards(), equalTo(0));

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remote2Cluster.getIndexExpression(), equalTo("logs*"));
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(remote2Cluster.getTotalShards(), equalTo(0));
assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0));
assertThat(remote2Cluster.getSkippedShards(), equalTo(0));
assertThat(remote2Cluster.getFailedShards(), equalTo(0));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(remote2Cluster.getTotalShards(), equalTo(0));
assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0));
assertThat(remote2Cluster.getSkippedShards(), equalTo(0));
assertThat(remote2Cluster.getFailedShards(), equalTo(0));

assertClusterMetadataInResponse(resp, responseExpectMeta, 3);
}
} finally {
if (asyncExecutionId != null) {
AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId);
assertThat(acknowledgedResponse.isAcknowledged(), is(true));
}
}
}

protected EsqlQueryResponse runAsyncQuery(String query, Boolean ccsMetadata, QueryBuilder filter, TimeValue waitCompletionTime) {
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
request.query(query);
Expand Down Expand Up @@ -358,12 +436,6 @@ public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
};
}

private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";
private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking";

Map<String, Object> setupClusters(int numClusters) throws IOException {
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
int numShardsLocal = randomIntBetween(1, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener

@Override
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
if (task instanceof EsqlQueryTask esqlQueryTask) {
esqlQueryTask.setExecutionInfo(createEsqlExecutionInfo(request));
}
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

Expand All @@ -170,14 +174,9 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
System.nanoTime()
);
String sessionId = sessionID(task);
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
request.includeCCSMetadata()
);
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
if (task instanceof EsqlQueryTask esqlQueryTask) {
esqlQueryTask.setExecutionInfo(executionInfo);
}
// async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
// sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
Expand All @@ -198,6 +197,18 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
);
}

private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {
if (task instanceof EsqlQueryTask esqlQueryTask && esqlQueryTask.executionInfo() != null) {
return esqlQueryTask.executionInfo();
} else {
return createEsqlExecutionInfo(request);
}
}

private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata());
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
Expand Down Expand Up @@ -273,7 +284,7 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
asyncExecutionId,
true, // is_running
true, // isAsync
null
task.executionInfo()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
analyzedPlan(
parse(request.query(), request.params()),
Expand Down

0 comments on commit d2151b1

Please sign in to comment.