Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Allow_duplicates option for append processor (#61916) #63257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public final class AppendProcessor extends AbstractProcessor {

private final TemplateScript.Factory field;
private final ValueSource value;
private final boolean allowDuplicates;

AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
super(tag, description);
this.field = field;
this.value = value;
this.allowDuplicates = allowDuplicates;
}

public TemplateScript.Factory getField() {
Expand All @@ -57,7 +59,7 @@ public ValueSource getValue() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
ingestDocument.appendFieldValue(field, value, allowDuplicates);
return ingestDocument;
}

Expand All @@ -79,9 +81,11 @@ public AppendProcessor create(Map<String, Processor.Factory> registry, String pr
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService));
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService),
allowDuplicates);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.containsInAnyOrder;

public class AppendProcessorTests extends ESTestCase {

Expand All @@ -53,13 +56,13 @@ public void testAppendValuesToExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
Expand All @@ -82,13 +85,13 @@ public void testAppendValuesToNonExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -106,13 +109,13 @@ public void testConvertScalarToList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -132,13 +135,13 @@ public void testAppendMetadataExceptVersion() throws Exception {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
values.add(value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(randomAlphaOfLengthBetween(1, 10));
}
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values, true);
}

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand All @@ -156,10 +159,65 @@ public void testAppendMetadataExceptVersion() throws Exception {
}
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);

List<Object> valuesToAppend = new ArrayList<>();
valuesToAppend.add(originalValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
assertThat(fieldValue, not(instanceOf(List.class)));
assertThat(fieldValue, equalTo(originalValue));
}

public void testAppendingUniqueValueToScalar() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);

List<Object> valuesToAppend = new ArrayList<>();
String newValue = randomAlphaOfLengthBetween(1, 10);
valuesToAppend.add(newValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
assertThat(list.size(), equalTo(2));
assertThat(list, equalTo(org.elasticsearch.common.collect.List.of(originalValue, newValue)));
}

public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
List<String> list = new ArrayList<>();
int size = randomIntBetween(0, 10);
for (int i = 0; i < size; i++) {
list.add(randomAlphaOfLengthBetween(1, 10));
}
String originalField = RandomDocumentPicks.addRandomField(random(), ingestDocument, list);
List<String> expectedValues = new ArrayList<>(list);
List<String> existingValues = randomSubsetOf(list);
int uniqueValuesSize = randomIntBetween(0, 10);
List<String> uniqueValues = new ArrayList<>();
for (int i = 0; i < uniqueValuesSize; i++) {
uniqueValues.add(randomAlphaOfLengthBetween(1, 10));
}
List<String> valuesToAppend = new ArrayList<>(existingValues);
valuesToAppend.addAll(uniqueValues);
expectedValues.addAll(uniqueValues);
Collections.sort(valuesToAppend);
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
assertThat(fieldValue, sameInstance(list));
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
return new AppendProcessor(randomAlphaOfLength(10),
null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()));
ValueSource.wrap(fieldValue, TestTemplateService.instance()), allowDuplicates);
}

private enum Scalar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ public void testModifyFieldsOutsideArray() throws Exception {

ForEachProcessor processor = new ForEachProcessor(
"_tag", null, "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added"))))
org.elasticsearch.common.collect.List.of(
new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
org.elasticsearch.common.collect.List.of(
new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added")), true))
), false);
processor.execute(ingestDocument, (result, e) -> {});

Expand Down
75 changes: 69 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,24 @@ private static Object resolve(String pathElement, String fullPath, Object contex
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value) {
setFieldValue(path, value, true);
appendFieldValue(path, value, true);
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param path The path within the document in dot-notation
* @param value The value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value, boolean allowDuplicates) {
setFieldValue(path, value, true, allowDuplicates);
}

/**
Expand All @@ -399,6 +416,24 @@ public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSour
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model));
}

/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource, boolean allowDuplicates) {
Map<String, Object> model = createTemplateModel();
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model), allowDuplicates);
}

/**
* Sets the provided value to the provided path in the document.
* Any non existing path element will be created.
Expand Down Expand Up @@ -454,6 +489,10 @@ public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource
}

private void setFieldValue(String path, Object value, boolean append) {
setFieldValue(path, value, append, true);
}

private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
FieldPath fieldPath = new FieldPath(path);
Object context = fieldPath.initialContext;
for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
Expand Down Expand Up @@ -502,7 +541,7 @@ private void setFieldValue(String path, Object value, boolean append) {
if (append) {
if (map.containsKey(leafKey)) {
Object object = map.get(leafKey);
List<Object> list = appendValues(object, value);
Object list = appendValues(object, value, allowDuplicates);
if (list != object) {
map.put(leafKey, list);
}
Expand Down Expand Up @@ -530,7 +569,7 @@ private void setFieldValue(String path, Object value, boolean append) {
}
if (append) {
Object object = list.get(index);
List<Object> newList = appendValues(object, value);
Object newList = appendValues(object, value, allowDuplicates);
if (newList != object) {
list.set(index, newList);
}
Expand All @@ -544,7 +583,7 @@ private void setFieldValue(String path, Object value, boolean append) {
}

@SuppressWarnings("unchecked")
private static List<Object> appendValues(Object maybeList, Object value) {
private static Object appendValues(Object maybeList, Object value, boolean allowDuplicates) {
List<Object> list;
if (maybeList instanceof List) {
//maybeList is already a list, we append the provided values to it
Expand All @@ -554,8 +593,13 @@ private static List<Object> appendValues(Object maybeList, Object value) {
list = new ArrayList<>();
list.add(maybeList);
}
appendValues(list, value);
return list;
if (allowDuplicates) {
appendValues(list, value);
return list;
} else {
// if no values were appended due to duplication, return the original object so the ingest document remains unmodified
return appendValuesWithoutDuplicates(list, value) ? list : maybeList;
}
}

private static void appendValues(List<Object> list, Object value) {
Expand All @@ -566,6 +610,25 @@ private static void appendValues(List<Object> list, Object value) {
}
}

private static boolean appendValuesWithoutDuplicates(List<Object> list, Object value) {
boolean valuesWereAppended = false;
if (value instanceof List) {
List<?> valueList = (List<?>) value;
for (Object val : valueList) {
if (list.contains(val) == false) {
list.add(val);
valuesWereAppended = true;
}
}
} else {
if (list.contains(value) == false) {
list.add(value);
valuesWereAppended = true;
}
}
return valuesWereAppended;
}

private static <T> T cast(String path, Object object, Class<T> clazz) {
if (object == null) {
return null;
Expand Down
Loading