From 653ed405721deee789b0956fb031fa6749254231 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 11 Dec 2024 10:01:49 +0100 Subject: [PATCH] keep track of docid to avoid going backwards to protected against misusing doc values APIs when source mode is synthetic: ``` java.lang.AssertionError at __randomizedtesting.SeedInfo.seed([BA10D1FF912D808]:0) at org.apache.lucene.tests.index.AssertingLeafReader$AssertingNumericDocValues.advanceExact(AssertingLeafReader.java:757) at org.apache.lucene.index.SingletonSortedNumericDocValues.advanceExact(SingletonSortedNumericDocValues.java:62) at org.elasticsearch.index.mapper.SortedNumericDocValuesSyntheticFieldLoader$ImmediateDocValuesLoader.advanceToDoc(SortedNumericDocValuesSyntheticFieldLoader.java:142) at org.elasticsearch.index.mapper.ObjectMapper$SyntheticSourceFieldLoader$ObjectDocValuesLoader.advanceToDoc(ObjectMapper.java:965) at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.write(SourceLoader.java:210) at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.source(SourceLoader.java:181) at org.elasticsearch.index.mapper.SourceLoader$Synthetic$LeafWithMetrics.source(SourceLoader.java:146) at org.elasticsearch.search.lookup.SyntheticSourceProvider$SyntheticSourceLeafLoader.getSource(SyntheticSourceProvider.java:58) at org.elasticsearch.search.lookup.SyntheticSourceProvider.getSource(SyntheticSourceProvider.java:42) at org.elasticsearch.xpack.esql.plugin.ReinitializingSourceProvider.getSource(ReinitializingSourceProvider.java:41) at org.elasticsearch.search.lookup.LeafSearchLookup.lambda$new$0(LeafSearchLookup.java:40) at org.elasticsearch.script.AbstractFieldScript.extractFromSource(AbstractFieldScript.java:107) at org.elasticsearch.script.AbstractFieldScript.emitFromSource(AbstractFieldScript.java:127) at org.elasticsearch.script.LongFieldScript$1$1.execute(LongFieldScript.java:29) at org.elasticsearch.script.AbstractFieldScript.runForDoc(AbstractFieldScript.java:159) at org.elasticsearch.index.fielddata.LongScriptDocValues.advanceExact(LongScriptDocValues.java:26) at org.elasticsearch.search.MultiValueMode$6.advanceExact(MultiValueMode.java:534) at org.elasticsearch.index.fielddata.FieldData$17.advanceExact(FieldData.java:620) at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.getValueForDoc(LongComparator.java:80) at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.copy(LongComparator.java:105) at org.apache.lucene.search.TopFieldCollector$TopFieldLeafCollector.collectAnyHit(TopFieldCollector.java:124) at org.apache.lucene.search.TopFieldCollector$SimpleFieldCollector$1.collect(TopFieldCollector.java:209) at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreRange(Weight.java:305) at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:264) at org.elasticsearch.compute.lucene.LuceneOperator$LuceneScorer.scoreNextRange(LuceneOperator.java:193) at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.collect(LuceneTopNSourceOperator.java:168) at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.getCheckedOutput(LuceneTopNSourceOperator.java:148) at org.elasticsearch.compute.lucene.LuceneOperator.getOutput(LuceneOperator.java:118) at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:258) at org.elasticsearch.compute.operator.Driver.run(Driver.java:189) at org.elasticsearch.compute.operator.Driver$1.doRun(Driver.java:378) at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:34) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023) at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1575) ``` --- .../esql/plugin/ReinitializingSourceProvider.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index a76687f15552a..8dee3478b3b64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -15,15 +15,23 @@ import java.util.function.Supplier; /** - * This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). + * This class exists as a workaround for using SourceProvider in the compute engine. + *

+ * The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses * its own {@link SourceProvider}. + *

+ * Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle. */ final class ReinitializingSourceProvider implements SourceProvider { private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; + // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: + // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) + private int lastSeenDocId; + ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; } @@ -32,10 +40,11 @@ final class ReinitializingSourceProvider implements SourceProvider { public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread) { + if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) { provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); this.perThreadProvider = provider; } + lastSeenDocId = doc; return provider.source.getSource(ctx, doc); }