diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index a76687f15552a..8dee3478b3b64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -15,15 +15,23 @@ import java.util.function.Supplier; /** - * This is a workaround for when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). + * This class exists as a workaround for using SourceProvider in the compute engine. + *

+ * The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses * its own {@link SourceProvider}. + *

+ * Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle. */ final class ReinitializingSourceProvider implements SourceProvider { private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; + // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: + // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) + private int lastSeenDocId; + ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; } @@ -32,10 +40,11 @@ final class ReinitializingSourceProvider implements SourceProvider { public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread) { + if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) { provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); this.perThreadProvider = provider; } + lastSeenDocId = doc; return provider.source.getSource(ctx, doc); }