diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index ff51f91a75867..7fb69aaaf305e 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -813,7 +813,7 @@ teardown: --- -"Test simulate with provided pipeline definition with description in processors": +"Test simulate with provided pipeline definition with tag and description in processors": - do: ingest.simulate: verbose: true @@ -825,6 +825,7 @@ teardown: { "set" : { "description": "processor_description", + "tag": "processor_tag", "field" : "field2", "value" : "_value" } @@ -844,3 +845,5 @@ teardown: - length: { docs: 1 } - length: { docs.0.processor_results: 1 } - match: { docs.0.processor_results.0.doc._source.field2: "_value" } + - match: { docs.0.processor_results.0.description: "processor_description" } + - match: { docs.0.processor_results.0.tag: "processor_tag" } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java index d5743b6d649c6..b9db1dd3c2c86 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,6 +40,7 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { private static final String IGNORED_ERROR_FIELD = "ignored_error"; private final String processorTag; + private final String description; private final WriteableIngestDocument ingestDocument; private final Exception failure; @@ -62,18 +64,20 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { true, a -> { String processorTag = a[0] == null ? null : (String)a[0]; - IngestDocument document = a[1] == null ? null : ((WriteableIngestDocument)a[1]).getIngestDocument(); + String description = a[1] == null ? null : (String)a[1]; + IngestDocument document = a[2] == null ? null : ((WriteableIngestDocument)a[2]).getIngestDocument(); Exception failure = null; - if (a[2] != null) { - failure = (ElasticsearchException)a[2]; - } else if (a[3] != null) { + if (a[3] != null) { failure = (ElasticsearchException)a[3]; + } else if (a[4] != null) { + failure = (ElasticsearchException)a[4]; } - return new SimulateProcessorResult(processorTag, document, failure); + return new SimulateProcessorResult(processorTag, description, document, failure); } ); static { PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY)); + PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY)); PARSER.declareObject( optionalConstructorArg(), WriteableIngestDocument.INGEST_DOC_PARSER, @@ -91,22 +95,24 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { ); } - public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) { + public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument, + Exception failure) { this.processorTag = processorTag; + this.description = description; this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument); this.failure = failure; } - public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) { - this(processorTag, ingestDocument, null); + public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument) { + this(processorTag, description, ingestDocument, null); } - public SimulateProcessorResult(String processorTag, Exception failure) { - this(processorTag, null, failure); + public SimulateProcessorResult(String processorTag, String description, Exception failure) { + this(processorTag, description, null, failure); } - public SimulateProcessorResult(String processorTag) { - this(processorTag, null, null); + public SimulateProcessorResult(String processorTag, String description) { + this(processorTag, description, null, null); } /** @@ -116,6 +122,11 @@ public SimulateProcessorResult(String processorTag) { this.processorTag = in.readString(); this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new); this.failure = in.readException(); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + this.description = in.readOptionalString(); + } else { + this.description = null; + } } @Override @@ -123,6 +134,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(processorTag); out.writeOptionalWriteable(ingestDocument); out.writeException(failure); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalString(description); + } } public IngestDocument getIngestDocument() { @@ -140,6 +154,10 @@ public Exception getFailure() { return failure; } + public String getDescription() { + return description; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (processorTag == null && failure == null && ingestDocument == null) { @@ -153,6 +171,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(ConfigurationUtils.TAG_KEY, processorTag); } + if (description != null) { + builder.field(ConfigurationUtils.DESCRIPTION_KEY, description); + } + if (failure != null && ingestDocument != null) { builder.startObject(IGNORED_ERROR_FIELD); ElasticsearchException.generateFailureXContent(builder, params, failure, true); diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 5582275b8e7bd..d2600446ad115 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -66,9 +66,10 @@ public void execute(IngestDocument ingestDocument, BiConsumer { if (e != null) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), e)); } handler.accept(null, e); } else { if (result != null) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), new IngestDocument(ingestDocument))); handler.accept(result, null); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag())); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription())); handler.accept(null, null); } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java index 2e0d6a75749bb..90d2337881b9c 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java @@ -19,11 +19,13 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.StringJoiner; @@ -50,6 +52,7 @@ public void testSerialization() throws IOException { StreamInput streamInput = out.bytes().streamInput(); SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput); assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag())); + assertThat(otherSimulateProcessorResult.getDescription(), equalTo(simulateProcessorResult.getDescription())); if (isSuccessful) { assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument()); if (isIgnoredException) { @@ -67,19 +70,36 @@ public void testSerialization() throws IOException { } } + public void testBWCDescription() throws IOException { + boolean isSuccessful = randomBoolean(); + boolean isIgnoredException = randomBoolean(); + SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException); + + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0)); + simulateProcessorResult.writeTo(out); + StreamInput in = out.bytes().streamInput(); + in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0)); + SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(in); + assertNull(otherSimulateProcessorResult.getDescription()); + } + static SimulateProcessorResult createTestInstance(boolean isSuccessful, boolean isIgnoredException) { String processorTag = randomAlphaOfLengthBetween(1, 10); + String description = randomAlphaOfLengthBetween(1, 10); SimulateProcessorResult simulateProcessorResult; if (isSuccessful) { IngestDocument ingestDocument = createRandomIngestDoc(); if (isIgnoredException) { - simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test")); + simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument, + new IllegalArgumentException("test")); } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); + simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument); } } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); + simulateProcessorResult = new SimulateProcessorResult(processorTag, description, + new IllegalArgumentException("test")); } return simulateProcessorResult; } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index dce1e6d32559b..a66aa815c916b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -67,7 +67,8 @@ public void testActualProcessor() throws Exception { TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); @@ -87,7 +88,8 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); assertThat(resultList.get(0).getIngestDocument(), nullValue()); @@ -107,8 +109,10 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), + failProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), + failProcessor.getDescription(), ingestDocument); assertThat(failProcessor.getInvokedCounter(), equalTo(2)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); @@ -159,8 +163,10 @@ public void testActualCompoundProcessorWithOnFailureAndTrueCondition() throws Ex trackingProcessor.execute(ingestDocument, (result, e) -> { }); - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), + failProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), + onFailureProcessor.getDescription(), ingestDocument); assertThat(failProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); @@ -188,7 +194,8 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), + testProcessor.getDescription(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); @@ -218,7 +225,8 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), + compoundProcessor.getDescription(), ingestDocument); //the step for key 2 is never executed due to conditional and thus not part of the result set assertThat(resultList.size(), equalTo(2)); @@ -262,7 +270,8 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); @@ -331,7 +340,8 @@ pipelineId2, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); @@ -401,7 +411,8 @@ pipelineId2, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); @@ -453,7 +464,8 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); @@ -530,7 +542,8 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);