Skip to content

Commit

Permalink
Address mapping and compute engine runtime field issues (elastic#117792)
Browse files Browse the repository at this point in the history
This change addresses the following issues:

Fields mapped as runtime fields not getting stored if source mode is synthetic.
Address java.io.EOFException when an es|ql query uses multiple runtime fields that fallback to source when source mode is synthetic. (1)
Address concurrency issue when runtime fields get pushed down to Lucene. (2)
1: ValueSourceOperator can read values in row striding or columnar fashion. When values are read in columnar fashion and multiple runtime fields synthetize source then this can cause the same SourceProvider evaluation the same range of docs ids multiple times. This can then result in unexpected io errors at the codec level. This is because the same doc value instances are used by SourceProvider. Re-evaluating the same docids is in violation of the contract of the DocIdSetIterator#advance(...) / DocIdSetIterator#advanceExact(...) methods, which documents that unexpected behaviour can occur if target docid is lower than current docid position.

Note that this is only an issue for synthetic source loader and not for stored source loader. And not when executing in row stride fashion which sometimes happen in compute engine and always happen in _search api.

2: The concurrency issue that arrises with source provider if source operator executes in parallel with data portioning set to DOC. The same SourceProvider instance then gets access by multiple threads concurrently. SourceProviders implementations are not designed to handle concurrent access.

Closes elastic#117644
  • Loading branch information
martijnvg committed Dec 5, 2024
1 parent 36d8307 commit 4ff643b
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 10 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/117792.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 117792
summary: Address mapping and compute engine runtime field issues
area: Mapping
type: bug
issues:
- 117644
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) {
protected void parseCreateField(DocumentParserContext context) {
// Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source.
// #parseValue calls this method once the run-time field is created.
if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) {
var fieldType = context.mappingLookup().getFieldType(path);
boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType;
if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) {
try {
context.addIgnoredField(
IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) {
*/
public SearchLookup lookup() {
if (this.lookup == null) {
SourceProvider sourceProvider = isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
var sourceProvider = createSourceProvider();
setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields());
}
return this.lookup;
}

public SourceProvider createSourceProvider() {
return isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
}

/**
* Replace the standard source provider and field lookup provider on the SearchLookup
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set<String> fieldChain) {
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set<String> fieldChain) {
this.fieldChain = Collections.unmodifiableSet(fieldChain);
this.sourceProvider = sourceProvider;
this.fieldTypeLookup = searchLookup.fieldTypeLookup;
this.fieldDataLookup = searchLookup.fieldDataLookup;
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

/**
* Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references
* in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields.
Expand Down Expand Up @@ -144,4 +152,8 @@ public IndexFieldData<?> getForField(MappedFieldType fieldType, MappedFieldType.
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
return sourceProvider.getSource(ctx, doc);
}

public SearchLookup swapSourceProvider(SourceProvider sourceProvider) {
return new SearchLookup(this, sourceProvider, fieldChain);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.ClusterAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() {
}
}

public void testScriptField() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder();
mapping.startObject();
{
mapping.startObject("runtime");
{
mapping.startObject("k1");
mapping.field("type", "long");
mapping.endObject();
mapping.startObject("k2");
mapping.field("type", "long");
mapping.endObject();
}
mapping.endObject();
{
mapping.startObject("properties");
mapping.startObject("meter").field("type", "double").endObject();
mapping.endObject();
}
}
mapping.endObject();
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
}
}

private void clearPersistentSettings(Setting<?>... settings) {
Settings.Builder clearedSettings = Settings.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NestedLookup;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -348,7 +349,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() {

@Override
public SearchLookup lookup() {
return ctx.lookup();
boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings());
var searchLookup = ctx.lookup();
if (syntheticSource) {
// in the context of scripts and when synthetic source is used the search lookup can't always be reused between
// users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always
// accurately determine whether a script uses _source, we should do this for all script usages.
// This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here.
searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider());
}
return searchLookup;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;

Expand Down Expand Up @@ -428,12 +430,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts.size());
for (int i = 0; i < context.searchContexts.size(); i++) {
SearchContext searchContext = context.searchContexts.get(i);
var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) {

@Override
public SourceProvider createSourceProvider() {
final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
return new ReinitializingSourceProvider(supplier);

}
};
contexts.add(
new EsPhysicalOperationProviders.DefaultShardContext(
i,
searchContext.getSearchExecutionContext(),
searchContext.request().getAliasFilter()
)
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
);
}
final List<Driver> drivers;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceProvider;

import java.io.IOException;
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}

@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return perThreadProvider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.hamcrest.Matchers;
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException {
assertNull(settings.get("index.mapping.source.mode"));
}

public void testEsqlRuntimeFields() throws IOException {
String mappings = """
{
"runtime": {
"message_length": {
"type": "long"
},
"log.offset": {
"type": "long"
}
},
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"log" : {
"properties": {
"level": {
"type": "keyword"
},
"file": {
"type": "keyword"
}
}
}
}
}
""";
String indexName = "test-foo";
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);

int numDocs = 500;
var sb = new StringBuilder();
var now = Instant.now();

var expectedMinTimestamp = now;
for (int i = 0; i < numDocs; i++) {
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
String msg = randomAlphaOfLength(20);
String path = randomAlphaOfLength(8);
String messageLength = Integer.toString(msg.length());
String offset = Integer.toString(randomNonNegativeInt());
sb.append("{ \"create\": {} }").append('\n');
if (randomBoolean()) {
sb.append(
"""
{"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}}
""".replace("$now", formatInstant(now))
.replace("$level", level)
.replace("$msg", msg)
.replace("$path", path)
.replace("$l", messageLength)
.replace("$o", offset)
);
} else {
sb.append("""
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
}
sb.append('\n');
if (i != numDocs - 1) {
now = now.plusSeconds(1);
}
}
var expectedMaxTimestamp = now;

var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));

var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
forceMergeRequest.addParameter("max_num_segments", "1");
var forceMergeResponse = client().performRequest(forceMergeRequest);
assertOK(forceMergeResponse);

String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)"
+ " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1";
final Request esqlRequest = new Request("POST", "/_query");
esqlRequest.setJsonEntity("""
{
"query": "$query"
}
""".replace("$query", query));
var esqlResponse = client().performRequest(esqlRequest);
assertOK(esqlResponse);
Map<String, Object> esqlResponseBody = responseAsMap(esqlResponse);

List<?> values = (List<?>) esqlResponseBody.get("values");
assertThat(values, Matchers.not(Matchers.empty()));
var count = ((List<?>) values.getFirst()).get(0);
assertThat(count, equalTo(numDocs));
logger.warn("VALUES: {}", values);

var minTimestamp = ((List<?>) values.getFirst()).get(1);
assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp)));
var maxTimestamp = ((List<?>) values.getFirst()).get(2);
assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp)));

var minLength = ((List<?>) values.getFirst()).get(3);
assertThat(minLength, equalTo(20));
var maxLength = ((List<?>) values.getFirst()).get(4);
assertThat(maxLength, equalTo(20));
var sumLength = ((List<?>) values.getFirst()).get(5);
assertThat(sumLength, equalTo(20 * numDocs));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}

0 comments on commit 4ff643b

Please sign in to comment.