Skip to content

Commit

Permalink
Add capability to disable source recovery_source for an index
Browse files Browse the repository at this point in the history
Signed-off-by: Navneet Verma <navneev@amazon.com>
  • Loading branch information
navneet1v committed May 8, 2024
1 parent 6b6a0a2 commit fe19810
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))
- Add support for deep copying SearchRequest ([#12295](https://github.com/opensearch-project/OpenSearch/pull/12295))
- Support multi ranges traversal when doing date histogram rewrite optimization. ([#13317](https://github.com/opensearch-project/OpenSearch/pull/13317))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SourceFieldMapper extends MetadataFieldMapper {

public static final String CONTENT_TYPE = "_source";
private final Function<Map<String, ?>, Map<String, Object>> filter;
private final Function<Map<String, ?>, Map<String, Object>> recoverySourceFilter;

/**
* Default parameters for source fields
Expand Down Expand Up @@ -119,21 +120,45 @@ public static class Builder extends MetadataFieldMapper.Builder {
Collections.emptyList()
);

private final Parameter<Boolean> recoverySourceEnabled = Parameter.boolParam(
"recovery_source_enabled",
false,
m -> toType(m).recoverySourceEnabled,
Defaults.ENABLED
);

private final Parameter<List<String>> recoverySourceIncludes = Parameter.stringArrayParam(
"recovery_source_includes",
false,
m -> Arrays.asList(toType(m).recoverySourceIncludes),
Collections.emptyList()
);

private final Parameter<List<String>> recoverySourceExcludes = Parameter.stringArrayParam(
"recovery_source_excludes",
false,
m -> Arrays.asList(toType(m).recoverySourceExcludes),
Collections.emptyList()
);

public Builder() {
super(Defaults.NAME);
}

@Override
protected List<Parameter<?>> getParameters() {
return Arrays.asList(enabled, includes, excludes);
return Arrays.asList(enabled, includes, excludes, recoverySourceEnabled, recoverySourceIncludes, recoverySourceExcludes);
}

@Override
public SourceFieldMapper build(BuilderContext context) {
return new SourceFieldMapper(
enabled.getValue(),
includes.getValue().toArray(new String[0]),
excludes.getValue().toArray(new String[0])
excludes.getValue().toArray(new String[0]),
recoverySourceEnabled.getValue(),
recoverySourceIncludes.getValue().toArray(new String[0]),
recoverySourceExcludes.getValue().toArray(new String[0])
);
}
}
Expand Down Expand Up @@ -173,24 +198,44 @@ public Query termQuery(Object value, QueryShardContext context) {
}

private final boolean enabled;
private final boolean recoverySourceEnabled;
/** indicates whether the source will always exist and be complete, for use by features like the update API */
private final boolean complete;

private final String[] includes;
private final String[] excludes;
private final String[] recoverySourceIncludes;
private final String[] recoverySourceExcludes;

private SourceFieldMapper() {
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
}

private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) {
private SourceFieldMapper(
boolean enabled,
String[] includes,
String[] excludes,
boolean recoverySourceEnabled,
String[] recoverySourceIncludes,
String[] recoverySourceExcludes
) {
super(new SourceFieldType(enabled));
this.enabled = enabled;
this.includes = includes;
this.excludes = excludes;
final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null;
this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);

// Set parameters for recovery source
this.recoverySourceEnabled = recoverySourceEnabled;
this.recoverySourceIncludes = recoverySourceIncludes;
this.recoverySourceExcludes = recoverySourceExcludes;
final boolean recoverySourcefiltered = CollectionUtils.isEmpty(recoverySourceIncludes) == false
|| CollectionUtils.isEmpty(recoverySourceExcludes) == false;
this.recoverySourceFilter = this.recoverySourceEnabled && recoverySourcefiltered
? XContentMapValues.filter(recoverySourceIncludes, recoverySourceExcludes)
: null;
}

public boolean enabled() {
Expand All @@ -212,22 +257,40 @@ public void preParse(ParseContext context) throws IOException {
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
if (recoverySourceEnabled) {
if (originalSource != null && adaptedSource != originalSource) {
final BytesReference adaptedRecoverySource = applyFilters(
originalSource,
contentType,
recoverySourceEnabled,
recoverySourceFilter
);
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = adaptedRecoverySource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}
}

@Nullable
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable MediaType contentType) throws IOException {
if (enabled && originalSource != null) {
return applyFilters(originalSource, contentType, enabled, filter);
}

@Nullable
private BytesReference applyFilters(
@Nullable BytesReference originalSource,
@Nullable MediaType contentType,
boolean isProvidedSourceEnabled,
@Nullable final Function<Map<String, ?>, Map<String, Object>> filters
) throws IOException {
if (isProvidedSourceEnabled && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
if (filters != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
Map<String, Object> filteredSource = filters.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
MediaType actualContentType = mapTuple.v1();
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void testNoFormat() throws Exception {
XContentType.SMILE
)
);

final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNull(recoverySourceIndexableField);
assertThat(MediaTypeRegistry.xContentType(doc.source()), equalTo(XContentType.SMILE));
}

Expand Down Expand Up @@ -128,13 +129,92 @@ public void testIncludes() throws Exception {
)
);

IndexableField sourceField = doc.rootDoc().getField("_source");
Map<String, Object> sourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) {
sourceAsMap = parser.map();
}
final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNotNull(recoverySourceIndexableField);
assertThat(sourceAsMap.containsKey("path1"), equalTo(true));
assertThat(sourceAsMap.containsKey("path2"), equalTo(false));
}

public void testIncludesForRecoverySource() throws Exception {
String mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("type")
.startObject("_source")
.array("includes", new String[] { "path1*" })
.array("recovery_source_includes", new String[] { "path2*" })
.endObject()
.endObject()
.endObject()
.toString();

DocumentMapper documentMapper = createIndex("test").mapperService()
.documentMapperParser()
.parse("type", new CompressedXContent(mapping));

ParsedDocument doc = documentMapper.parse(
new SourceToParse(
"test",
"1",
BytesReference.bytes(
XContentFactory.jsonBuilder()
.startObject()
.startObject("path1")
.field("field1", "value1")
.endObject()
.startObject("path2")
.field("field2", "value2")
.endObject()
.endObject()
),
MediaTypeRegistry.JSON
)
);

IndexableField sourceField = doc.rootDoc().getField("_source");
Map<String, Object> sourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) {
sourceAsMap = parser.map();
}
assertThat(sourceAsMap.containsKey("path1"), equalTo(true));
assertThat(sourceAsMap.containsKey("path2"), equalTo(false));

final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNotNull(recoverySourceIndexableField);
Map<String, Object> recoverySourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(recoverySourceIndexableField.binaryValue()))) {
recoverySourceAsMap = parser.map();
}

assertThat(recoverySourceAsMap.containsKey("path1"), equalTo(false));
assertThat(recoverySourceAsMap.containsKey("path2"), equalTo(true));
}

public void testNoRecoverySourceAndNoSource_whenBothAreDisabled() throws Exception {
String mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("type")
.startObject("_source")
.field("enabled", "false")
.field("recovery_source_enabled", "false")
.endObject()
.endObject()
.endObject()
.toString();

DocumentMapperParser parser = createIndex("test").mapperService().documentMapperParser();
DocumentMapper documentMapper = parser.parse("type", new CompressedXContent(mapping));
BytesReference source = BytesReference.bytes(XContentFactory.jsonBuilder().startObject().field("field", "value").endObject());
ParsedDocument doc = documentMapper.parse(new SourceToParse("test", "1", source, MediaTypeRegistry.JSON));

final IndexableField sourceIndexableField = doc.rootDoc().getField("_source");
final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNull(recoverySourceIndexableField);
assertNull(sourceIndexableField);
}

public void testExcludes() throws Exception {
Expand Down Expand Up @@ -171,13 +251,68 @@ public void testExcludes() throws Exception {
)
);

IndexableField sourceField = doc.rootDoc().getField("_source");
Map<String, Object> sourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) {
sourceAsMap = parser.map();
}
final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNotNull(recoverySourceIndexableField);
assertThat(sourceAsMap.containsKey("path1"), equalTo(false));
assertThat(sourceAsMap.containsKey("path2"), equalTo(true));
}

public void testExcludesForRecoverySource() throws Exception {
String mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("type")
.startObject("_source")
.array("excludes", "path1*")
.array("recovery_source_excludes", "path2*")
.endObject()
.endObject()
.endObject()
.toString();

DocumentMapper documentMapper = createIndex("test").mapperService()
.documentMapperParser()
.parse("type", new CompressedXContent(mapping));

ParsedDocument doc = documentMapper.parse(
new SourceToParse(
"test",
"1",
BytesReference.bytes(
XContentFactory.jsonBuilder()
.startObject()
.startObject("path1")
.field("field1", "value1")
.endObject()
.startObject("path2")
.field("field2", "value2")
.endObject()
.endObject()
),
MediaTypeRegistry.JSON
)
);

IndexableField sourceField = doc.rootDoc().getField("_source");
Map<String, Object> sourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(sourceField.binaryValue()))) {
sourceAsMap = parser.map();
}
assertThat(sourceAsMap.containsKey("path1"), equalTo(false));
assertThat(sourceAsMap.containsKey("path2"), equalTo(true));

final IndexableField recoverySourceIndexableField = doc.rootDoc().getField("_recovery_source");
assertNotNull(recoverySourceIndexableField);
Map<String, Object> recoverySourceAsMap;
try (XContentParser parser = createParser(JsonXContent.jsonXContent, new BytesArray(recoverySourceIndexableField.binaryValue()))) {
recoverySourceAsMap = parser.map();
}
assertThat(recoverySourceAsMap.containsKey("path1"), equalTo(true));
assertThat(recoverySourceAsMap.containsKey("path2"), equalTo(false));
}

public void testEnabledNotUpdateable() throws Exception {
Expand Down

0 comments on commit fe19810

Please sign in to comment.