Skip to content

Commit

Permalink
Interfaces
Browse files Browse the repository at this point in the history
expose size observer plugin to stateless

integ test

remove unused assertions

spotless

spotless
  • Loading branch information
pgomulka committed May 9, 2024
1 parent 7b62ea9 commit 7fb5060
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class DocumentSizeObserverWithPipelinesIT extends ESIntegTestCase {
Expand Down Expand Up @@ -92,21 +91,13 @@ public DocumentSizeObserver newDocumentSizeObserver() {
}

@Override
public DocumentSizeReporter getDocumentParsingReporter(String indexName) {
return new TestDocumentSizeReporter();
public DocumentSizeReporter getDocumentParsingReporter(String indexName, DocumentSizeAccumulator documentSizeAccumulator) {
return DocumentSizeReporter.EMPTY_INSTANCE;
}
};
}
}

public static class TestDocumentSizeReporter implements DocumentSizeReporter {
@Override
public void onCompleted(String indexName, long normalizedBytesParsed) {
assertThat(indexName, equalTo(TEST_INDEX_NAME));
assertThat(normalizedBytesParsed, equalTo(1L));
}
}

public static class TestDocumentSizeObserver implements DocumentSizeObserver {
long mapCounter = 0;
long wrapperCounter = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
package org.elasticsearch.plugins.internal;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -19,6 +24,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.xcontent.XContentFactory.cborBuilder;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -29,25 +36,36 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {

private static String TEST_INDEX_NAME = "test-index-name";

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestDocumentParsingProviderPlugin.class, TestEnginePlugin.class);
}

// the assertions are done in plugin which is static and will be created by ES server.
// hence a static flag to make sure it is indeed used
public static boolean hasWrappedParser;
public static AtomicLong COUNTER = new AtomicLong();

public void testDocumentIsReportedUponBulk() throws IOException {
public void testDocumentIsReportedUponBulk() throws Exception {
hasWrappedParser = false;
client().index(
new IndexRequest(TEST_INDEX_NAME).id("1").source(jsonBuilder().startObject().field("test", "I am sam i am").endObject())
).actionGet();
assertTrue(hasWrappedParser);
// there are more assertions in a TestDocumentParsingProviderPlugin
assertDocumentReported();

hasWrappedParser = false;
// the format of the request does not matter
client().index(
new IndexRequest(TEST_INDEX_NAME).id("2").source(cborBuilder().startObject().field("test", "I am sam i am").endObject())
).actionGet();
assertTrue(hasWrappedParser);
// there are more assertions in a TestDocumentParsingProviderPlugin
assertDocumentReported();

hasWrappedParser = false;
// white spaces does not matter
Expand All @@ -59,12 +77,40 @@ public void testDocumentIsReportedUponBulk() throws IOException {
}
""", XContentType.JSON)).actionGet();
assertTrue(hasWrappedParser);
// there are more assertions in a TestDocumentParsingProviderPlugin
assertDocumentReported();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestDocumentParsingProviderPlugin.class);
private void assertDocumentReported() throws Exception {
assertBusy(() -> assertThat(COUNTER.get(), equalTo(5L)));
COUNTER.set(0);
}

public static class TestEnginePlugin extends Plugin implements EnginePlugin {
DocumentParsingProvider documentParsingProvider;

@Override
public Collection<?> createComponents(PluginServices services) {
documentParsingProvider = services.documentParsingProvider();
return super.createComponents(services);
}

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {
@Override
public IndexResult index(Index index) throws IOException {
IndexResult result = super.index(index);

DocumentSizeReporter documentParsingReporter = documentParsingProvider.getDocumentParsingReporter(
shardId.getIndexName(),
DocumentSizeAccumulator.EMPTY_INSTANCE
);
documentParsingReporter.onIndexingCompleted(index.parsedDoc());

return result;
}
});
}
}

public static class TestDocumentParsingProviderPlugin extends Plugin implements DocumentParsingProviderPlugin, IngestPlugin {
Expand All @@ -86,19 +132,29 @@ public DocumentSizeObserver newDocumentSizeObserver() {
}

@Override
public DocumentSizeReporter getDocumentParsingReporter(String indexName) {
return new TestDocumentSizeReporter();
public DocumentSizeReporter getDocumentParsingReporter(String indexName, DocumentSizeAccumulator documentSizeAccumulator) {
return new TestDocumentSizeReporter(indexName);
}
};
}
}

public static class TestDocumentSizeReporter implements DocumentSizeReporter {

private final String indexName;

public TestDocumentSizeReporter(String indexName) {
this.indexName = indexName;
}

@Override
public void onParsingCompleted(ParsedDocument parsedDocument) {}

@Override
public void onCompleted(String indexName, long normalizedBytesParsed) {
public void onIndexingCompleted(ParsedDocument parsedDocument) {
DocumentSizeObserver documentSizeObserver = parsedDocument.getDocumentSizeObserver();
COUNTER.addAndGet(documentSizeObserver.normalisedBytesParsed());
assertThat(indexName, equalTo(TEST_INDEX_NAME));
assertThat(normalizedBytesParsed, equalTo(5L));
}
}

Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@
exports org.elasticsearch.plugins.internal
to
org.elasticsearch.metering,
org.elasticsearch.stateless,
org.elasticsearch.settings.secure,
org.elasticsearch.serverless.constants,
org.elasticsearch.serverless.apifiltering,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
import org.elasticsearch.plugins.internal.DocumentSizeReporter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -486,11 +485,6 @@ private static void onComplete(
final boolean isUpdate = opType == DocWriteRequest.OpType.UPDATE;
final BulkItemResponse executionResult = context.getExecutionResult();
final boolean isFailed = executionResult.isFailed();
if (isFailed == false && opType != DocWriteRequest.OpType.DELETE) {
DocumentSizeReporter documentSizeReporter = documentParsingProvider.getDocumentParsingReporter(docWriteRequest.index());
DocumentSizeObserver documentSizeObserver = context.getDocumentSizeObserver();
documentSizeReporter.onCompleted(docWriteRequest.index(), documentSizeObserver.normalisedBytesParsed());
}
if (isUpdate
&& isFailed
&& isConflictException(executionResult.getFailure().getCause())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
context.reorderParentAndGetDocs(),
context.sourceToParse().source(),
context.sourceToParse().getXContentType(),
dynamicUpdate
dynamicUpdate,
documentSizeObserver
) {
@Override
public String documentDescription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collections;
Expand All @@ -33,6 +34,8 @@ public class ParsedDocument {

private final List<LuceneDocument> documents;

private final DocumentSizeObserver documentSizeObserver;

private BytesReference source;
private XContentType xContentType;
private Mapping dynamicMappingsUpdate;
Expand All @@ -58,7 +61,8 @@ public static ParsedDocument noopTombstone(String reason) {
Collections.singletonList(document),
new BytesArray("{}"),
XContentType.JSON,
null
null,
DocumentSizeObserver.EMPTY_INSTANCE
);
}

Expand All @@ -82,7 +86,8 @@ public static ParsedDocument deleteTombstone(String id) {
Collections.singletonList(document),
new BytesArray("{}"),
XContentType.JSON,
null
null,
DocumentSizeObserver.EMPTY_INSTANCE
);
}

Expand All @@ -94,7 +99,8 @@ public ParsedDocument(
List<LuceneDocument> documents,
BytesReference source,
XContentType xContentType,
Mapping dynamicMappingsUpdate
Mapping dynamicMappingsUpdate,
DocumentSizeObserver documentSizeObserver
) {
this.version = version;
this.seqID = seqID;
Expand All @@ -104,6 +110,7 @@ public ParsedDocument(
this.source = source;
this.dynamicMappingsUpdate = dynamicMappingsUpdate;
this.xContentType = xContentType;
this.documentSizeObserver = documentSizeObserver;
}

public String id() {
Expand Down Expand Up @@ -171,4 +178,9 @@ public String toString() {
public String documentDescription() {
return "id";
}

public DocumentSizeObserver getDocumentSizeObserver() {
return documentSizeObserver;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ record PluginServiceInstances(
IndicesService indicesService,
FeatureService featureService,
SystemIndices systemIndices,
DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver
DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver,
DocumentParsingProvider documentParsingProvider
) implements Plugin.PluginServices {}
PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
Expand All @@ -837,7 +838,8 @@ record PluginServiceInstances(
indicesService,
featureService,
systemIndices,
dataStreamGlobalRetentionResolver
dataStreamGlobalRetentionResolver,
documentParsingProvider
);

Collection<?> pluginComponents = pluginsService.flatMap(p -> p.createComponents(pluginServices)).toList();
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.telemetry.TelemetryProvider;
Expand Down Expand Up @@ -162,6 +163,11 @@ public interface PluginServices {
* data streams managed by the data stream lifecycle.
*/
DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver();

/**
* A provider of utilities to observe and report parsing of documents
*/
DocumentParsingProvider documentParsingProvider();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,34 @@
*/
public interface DocumentParsingProvider {
DocumentParsingProvider EMPTY_INSTANCE = new DocumentParsingProvider() {
@Override
public DocumentSizeObserver newDocumentSizeObserver() {
return DocumentSizeObserver.EMPTY_INSTANCE;
}

@Override
public DocumentSizeReporter getDocumentParsingReporter(String indexName) {
return DocumentSizeReporter.EMPTY_INSTANCE;
}

@Override
public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
return DocumentSizeObserver.EMPTY_INSTANCE;
}
};

/**
* @return a new 'empty' observer to use when observing parsing
*/
DocumentSizeObserver newDocumentSizeObserver();
default DocumentSizeObserver newDocumentSizeObserver() {
return DocumentSizeObserver.EMPTY_INSTANCE;
}

/**
* @return an observer with a previously observed value (fixed to this value, not continuing)
*/
DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed);
default DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
return DocumentSizeObserver.EMPTY_INSTANCE;
}

/**
* @return an instance of a reporter to use when parsing has been completed and indexing successful
*/
DocumentSizeReporter getDocumentParsingReporter(String indexName);
default DocumentSizeReporter getDocumentParsingReporter(String indexName, DocumentSizeAccumulator documentSizeAccumulator) {
return DocumentSizeReporter.EMPTY_INSTANCE;
}

/**
* @return a new instance of DocumentSizeAccumulator
*/
default DocumentSizeAccumulator createDocumentSizeAccumulator() {
return DocumentSizeAccumulator.EMPTY_INSTANCE;
}

}
Loading

0 comments on commit 7fb5060

Please sign in to comment.