Skip to content

Commit

Permalink
Allow_duplicates option for append processor (#61916)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Sep 11, 2020
1 parent 7b1ab6f commit 2d8b7a9
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public final class AppendProcessor extends AbstractProcessor {

private final TemplateScript.Factory field;
private final ValueSource value;
private final boolean allowDuplicates;

AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
super(tag, description);
this.field = field;
this.value = value;
this.allowDuplicates = allowDuplicates;
}

public TemplateScript.Factory getField() {
Expand All @@ -57,7 +59,7 @@ public ValueSource getValue() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
ingestDocument.appendFieldValue(field, value, allowDuplicates);
return ingestDocument;
}

Expand All @@ -79,9 +81,11 @@ public AppendProcessor create(Map<String, Processor.Factory> registry, String pr
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService));
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService),
allowDuplicates);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.containsInAnyOrder;

public class AppendProcessorTests extends ESTestCase {

Expand All @@ -53,13 +56,13 @@ public void testAppendValuesToExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
Expand All @@ -82,13 +85,13 @@ public void testAppendValuesToNonExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -106,13 +109,13 @@ public void testConvertScalarToList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -132,13 +135,13 @@ public void testAppendMetadataExceptVersion() throws Exception {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
values.add(value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(randomAlphaOfLengthBetween(1, 10));
}
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values, true);
}

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand All @@ -156,10 +159,65 @@ public void testAppendMetadataExceptVersion() throws Exception {
}
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);

List<Object> valuesToAppend = new ArrayList<>();
valuesToAppend.add(originalValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
assertThat(fieldValue, not(instanceOf(List.class)));
assertThat(fieldValue, equalTo(originalValue));
}

public void testAppendingUniqueValueToScalar() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);

List<Object> valuesToAppend = new ArrayList<>();
String newValue = randomAlphaOfLengthBetween(1, 10);
valuesToAppend.add(newValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
assertThat(list.size(), equalTo(2));
assertThat(list, equalTo(List.of(originalValue, newValue)));
}

public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
List<String> list = new ArrayList<>();
int size = randomIntBetween(0, 10);
for (int i = 0; i < size; i++) {
list.add(randomAlphaOfLengthBetween(1, 10));
}
String originalField = RandomDocumentPicks.addRandomField(random(), ingestDocument, list);
List<String> expectedValues = new ArrayList<>(list);
List<String> existingValues = randomSubsetOf(list);
int uniqueValuesSize = randomIntBetween(0, 10);
List<String> uniqueValues = new ArrayList<>();
for (int i = 0; i < uniqueValuesSize; i++) {
uniqueValues.add(randomAlphaOfLengthBetween(1, 10));
}
List<String> valuesToAppend = new ArrayList<>(existingValues);
valuesToAppend.addAll(uniqueValues);
expectedValues.addAll(uniqueValues);
Collections.sort(valuesToAppend);
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
assertThat(fieldValue, sameInstance(list));
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
return new AppendProcessor(randomAlphaOfLength(10),
null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()));
ValueSource.wrap(fieldValue, TestTemplateService.instance()), allowDuplicates);
}

private enum Scalar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ public void testModifyFieldsOutsideArray() throws Exception {

ForEachProcessor processor = new ForEachProcessor(
"_tag", null, "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added"))))
List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
List.of(new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added")), true))
), false);
processor.execute(ingestDocument, (result, e) -> {});

Expand Down
75 changes: 69 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,24 @@ private static Object resolve(String pathElement, String fullPath, Object contex
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value) {
setFieldValue(path, value, true);
appendFieldValue(path, value, true);
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param path The path within the document in dot-notation
* @param value The value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value, boolean allowDuplicates) {
setFieldValue(path, value, true, allowDuplicates);
}

/**
Expand All @@ -397,6 +414,24 @@ public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSour
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model));
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource, boolean allowDuplicates) {
Map<String, Object> model = createTemplateModel();
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model), allowDuplicates);
}

/**
* Sets the provided value to the provided path in the document.
* Any non existing path element will be created.
Expand Down Expand Up @@ -452,6 +487,10 @@ public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource
}

private void setFieldValue(String path, Object value, boolean append) {
setFieldValue(path, value, append, true);
}

private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
FieldPath fieldPath = new FieldPath(path);
Object context = fieldPath.initialContext;
for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
Expand Down Expand Up @@ -500,7 +539,7 @@ private void setFieldValue(String path, Object value, boolean append) {
if (append) {
if (map.containsKey(leafKey)) {
Object object = map.get(leafKey);
List<Object> list = appendValues(object, value);
Object list = appendValues(object, value, allowDuplicates);
if (list != object) {
map.put(leafKey, list);
}
Expand Down Expand Up @@ -528,7 +567,7 @@ private void setFieldValue(String path, Object value, boolean append) {
}
if (append) {
Object object = list.get(index);
List<Object> newList = appendValues(object, value);
Object newList = appendValues(object, value, allowDuplicates);
if (newList != object) {
list.set(index, newList);
}
Expand All @@ -542,7 +581,7 @@ private void setFieldValue(String path, Object value, boolean append) {
}

@SuppressWarnings("unchecked")
private static List<Object> appendValues(Object maybeList, Object value) {
private static Object appendValues(Object maybeList, Object value, boolean allowDuplicates) {
List<Object> list;
if (maybeList instanceof List) {
//maybeList is already a list, we append the provided values to it
Expand All @@ -552,8 +591,13 @@ private static List<Object> appendValues(Object maybeList, Object value) {
list = new ArrayList<>();
list.add(maybeList);
}
appendValues(list, value);
return list;
if (allowDuplicates) {
appendValues(list, value);
return list;
} else {
// if no values were appended due to duplication, return the original object so the ingest document remains unmodified
return appendValuesWithoutDuplicates(list, value) ? list : maybeList;
}
}

private static void appendValues(List<Object> list, Object value) {
Expand All @@ -564,6 +608,25 @@ private static void appendValues(List<Object> list, Object value) {
}
}

private static boolean appendValuesWithoutDuplicates(List<Object> list, Object value) {
boolean valuesWereAppended = false;
if (value instanceof List) {
List<?> valueList = (List<?>) value;
for (Object val : valueList) {
if (list.contains(val) == false) {
list.add(val);
valuesWereAppended = true;
}
}
} else {
if (list.contains(value) == false) {
list.add(value);
valuesWereAppended = true;
}
}
return valuesWereAppended;
}

private static <T> T cast(String path, Object object, Class<T> clazz) {
if (object == null) {
return null;
Expand Down
Loading

0 comments on commit 2d8b7a9

Please sign in to comment.