Skip to content

Commit

Permalink
Merge branch 'main' into pluginloadermock
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Nov 27, 2024
2 parents 37be3c9 + c2e4afc commit 7bbed27
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117265.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117265
summary: Async search responses have CCS metadata while searches are running
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -292,6 +293,7 @@ static final class TransportRemoteSink implements RemoteSink {
final Executor responseExecutor;

final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
final AtomicBoolean finished = new AtomicBoolean(false);

TransportRemoteSink(
TransportService transportService,
Expand All @@ -311,6 +313,32 @@ static final class TransportRemoteSink implements RemoteSink {

@Override
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
if (allSourcesFinished) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener);
} else {
// already finished or promised
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
}
} else {
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
}
listener.onResponse(r);
}, e -> {
finished.set(true);
listener.onFailure(e);
}));
}
}

private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
if (reservedBytes > 0) {
// This doesn't fully protect ESQL from OOM, but reduces the likelihood.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,15 @@ public void testConcurrentWithTransportActions() {
ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
exchange1.registerTransportHandler(node1);
AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
Set<String> finishingRequests = ConcurrentCollections.newConcurrentSet();
node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
final ExchangeRequest exchangeRequest = (ExchangeRequest) request;
if (exchangeRequest.sourcesFinished()) {
String exchangeId = exchangeRequest.exchangeId();
assertTrue("tried to finish [" + exchangeId + "] twice", finishingRequests.add(exchangeId));
}
handler.messageReceived(request, channel, task);
});

try (exchange0; exchange1; node0; node1) {
String exchangeId = "exchange";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER_1 = "cluster-a";
private static final String REMOTE_CLUSTER_2 = "remote-b";
private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";

@Override
protected Collection<String> remoteClusterAlias() {
Expand Down Expand Up @@ -1278,11 +1282,6 @@ Map<String, Object> setupTwoClusters() {
return setupClusters(2);
}

private static String LOCAL_INDEX = "logs-1";
private static String IDX_ALIAS = "alias1";
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
private static String REMOTE_INDEX = "logs-2";

Map<String, Object> setupClusters(int numClusters) {
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
int numShardsLocal = randomIntBetween(1, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ public TimeValue overallTook() {
return overallTook;
}

/**
* How much time the query took since starting.
*/
public TimeValue tookSoFar() {
if (relativeStartNanos == null) {
return new TimeValue(0);
} else {
return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}
}

public Set<String> clusterAliases() {
return clusterInfo.keySet();
}
Expand Down Expand Up @@ -478,7 +489,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
{
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) {
if (took != null && status != Status.RUNNING) {
builder.field(TOOK.getPreferredName(), took.millis());
}
if (totalShards != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,11 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}
b.field("is_running", isRunning);
}
if (executionInfo != null && executionInfo.overallTook() != null) {
b.field("took", executionInfo.overallTook().millis());
if (executionInfo != null) {
long tookInMillis = executionInfo.overallTook() == null
? executionInfo.tookSoFar().millis()
: executionInfo.overallTook().millis();
b.field("took", tookInMillis);
}
if (dropNullColumns) {
b.append(ResponseXContentUtils.allColumns(columns, "all_columns"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {

private EsqlExecutionInfo executionInfo;

public EsqlQueryTask(
long id,
String type,
Expand All @@ -29,10 +31,19 @@ public EsqlQueryTask(
TimeValue keepAlive
) {
super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive);
this.executionInfo = null;
}

public void setExecutionInfo(EsqlExecutionInfo executionInfo) {
this.executionInfo = executionInfo;
}

public EsqlExecutionInfo executionInfo() {
return executionInfo;
}

@Override
public EsqlQueryResponse getCurrentResult() {
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, null);
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private ComputeListener(
if (runningOnRemoteCluster()) {
// for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and
// received by the acquireCompute method callback on the coordinating cluster
setFinalStatusAndShardCounts(clusterAlias, executionInfo);
EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias);
result = new ComputeResponse(
collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(),
Expand All @@ -126,19 +127,33 @@ private ComputeListener(
if (coordinatingClusterIsSearchedInCCS()) {
// if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
// data nodes have finished processing
executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
} else {
return v;
}
});
setFinalStatusAndShardCounts(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, executionInfo);
}
}
delegate.onResponse(result);
}, e -> delegate.onFailure(failureCollector.getFailure())));
}

private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
// TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v;
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
/*
* Total and skipped shard counts are set early in execution (after can-match).
* Until ES|QL supports shard-level partial results, we just set all non-skipped shards
* as successful and none are failed.
*/
.setSuccessfulShards(v.getTotalShards())
.setFailedShards(0)
.build();
} else {
return v;
}
});
}

/**
* @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void execute(
null
);
String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
updateShardCountForCoordinatorOnlyQuery(execInfo);
try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
Expand Down Expand Up @@ -260,18 +261,30 @@ public void execute(
}
}

// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
if (execInfo.isCrossClusterSearch()) {
for (String clusterAlias : execInfo.clusterAliases()) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}
}

// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
if (execInfo.isCrossClusterSearch()) {
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
for (String clusterAlias : execInfo.clusterAliases()) {
execInfo.swapCluster(clusterAlias, (k, v) -> {
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook());
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
}
Expand Down Expand Up @@ -324,9 +337,8 @@ private void startComputeOnDataNodes(
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(dataNodeResult.totalShards())
.setSuccessfulShards(dataNodeResult.totalShards())
// do not set successful or failed shard count here - do it when search is done
.setSkippedShards(dataNodeResult.skippedShards())
.setFailedShards(0)
.build()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener

@Override
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

Expand All @@ -170,10 +172,9 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
System.nanoTime()
);
String sessionId = sessionID(task);
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
request.includeCCSMetadata()
);
// async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
// sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
Expand All @@ -194,6 +195,18 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
);
}

private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {
if (task instanceof EsqlQueryTask esqlQueryTask && esqlQueryTask.executionInfo() != null) {
return esqlQueryTask.executionInfo();
} else {
return createEsqlExecutionInfo(request);
}
}

private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata());
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
Expand Down Expand Up @@ -269,7 +282,7 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
asyncExecutionId,
true, // is_running
true, // isAsync
null
task.executionInfo()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
analyzedPlan(
parse(request.query(), request.params()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,15 @@ static EsqlQueryResponse fromXContent(XContentParser parser) {
}

public void testChunkResponseSizeColumnar() {
int sizeClusterDetails = 14;
try (EsqlQueryResponse resp = randomResponse(true, null)) {
int sizeClusterDetails = 14;
int columnCount = resp.pages().get(0).getBlockCount();
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize);
}

try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
int sizeClusterDetails = resp.isRunning() ? 13 : 14; // overall took time not present when is_running=true
int columnCount = resp.pages().get(0).getBlockCount();
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize); // is_running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() {
assertThat(response.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.getCluster(remoteAlias).getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.getCluster(remoteAlias).getTook(), equalTo(response.getTook()));

// the status in the (remote) executionInfo will still be RUNNING, since the SUCCESSFUL status gets set on the querying
// cluster executionInfo in the acquireCompute CCS listener, NOT present in this test - see testCollectComputeResultsInCCSListener
assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));

Mockito.verifyNoInteractions(transportService.getTaskManager());
}
Expand All @@ -376,6 +373,17 @@ public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() {
// fully filled in for cross-cluster searches
executionInfo.swapCluster(localCluster, (k, v) -> new EsqlExecutionInfo.Cluster(localCluster, "logs*", false));
executionInfo.swapCluster("my_remote", (k, v) -> new EsqlExecutionInfo.Cluster("my_remote", "my_remote:logs*", false));

// before acquire-compute, can-match (SearchShards) runs filling in total shards and skipped shards, so simulate that here
executionInfo.swapCluster(
localCluster,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
);
executionInfo.swapCluster(
"my_remote",
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
);

try (
ComputeListener computeListener = ComputeListener.create(
// whereRunning=localCluster simulates running on the querying cluster
Expand Down

0 comments on commit 7bbed27

Please sign in to comment.