Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize source filtering in SourceFieldMapper #81970

Merged
merged 13 commits into from
Jan 12, 2022
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

/**
* A filter that filter fields away from source
*/
public interface XContentFieldFilter {
/**
* filter source in {@link BytesReference} format and in {@link XContentType} content type
*/
BytesReference apply(BytesReference sourceBytes, XContentType xContentType) throws IOException;

/**
* Construct {@link XContentFieldFilter} using given includes and excludes
*
* @param includes fields to keep, wildcard supported
* @param excludes fields to remove, wildcard supported
* @return filter using {@link XContentMapValues#filter(String[], String[])} if wildcard found in excludes
* , otherwise return filter using {@link XContentParser}
*/
static XContentFieldFilter newFieldFilter(String[] includes, String[] excludes) {
if ((CollectionUtils.isEmpty(excludes) == false) && Arrays.stream(excludes).filter(field -> field.contains("*")).count() > 0) {
return (originalSource, contentType) -> {
Function<Map<String, ?>, Map<String, Object>> mapFilter = XContentMapValues.filter(includes, excludes);
Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = mapFilter.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
XContentType actualContentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
builder.close();
return bStream.bytes();
};
} else {
final XContentParserConfiguration parserConfig = XContentParserConfiguration.EMPTY.withFiltering(
Set.of(includes),
Set.of(excludes)
);
return (originalSource, contentType) -> {
BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, originalSource.length()));
XContentBuilder builder = new XContentBuilder(contentType.xContent(), streamOutput);
XContentParser parser = contentType.xContent().createParser(parserConfig, originalSource.streamInput());
builder.copyCurrentStructure(parser);
return BytesReference.bytes(builder);
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,24 @@ public static BytesReference toXContent(ToXContent toXContent, XContentType xCon
}
}

public static XContentType xContentTypeMayCompressed(BytesReference bytes) {
mushao999 marked this conversation as resolved.
Show resolved Hide resolved
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
try {
InputStream compressedStreamInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedStreamInput.markSupported() == false) {
compressedStreamInput = new BufferedInputStream(compressedStreamInput);
}
return XContentFactory.xContentType(compressedStreamInput);
} catch (IOException e) {
assert false : "Should not happen, we're just reading bytes from memory";
throw new UncheckedIOException(e);
}
} else {
return XContentHelper.xContentType(bytes);
}
}

/**
* Guesses the content type based on the provided bytes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentFieldFilter;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -33,7 +32,6 @@
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
Expand Down Expand Up @@ -253,15 +251,12 @@ private GetResult innerGetLoadFromStoredFields(
if (fetchSourceContext.fetchSource() == false) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
Map<String, Object> sourceAsMap;
// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
// Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
XContentType sourceContentType = XContentHelper.xContentTypeMayCompressed(source);
try {
source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
source = XContentFieldFilter.newFieldFilter(fetchSourceContext.includes(), fetchSourceContext.excludes())
.apply(source, sourceContentType);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,24 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.common.xcontent.XContentFieldFilter;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class SourceFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_source";
public static final String RECOVERY_SOURCE_NAME = "_recovery_source";

public static final String CONTENT_TYPE = "_source";
private final Function<Map<String, ?>, Map<String, Object>> filter;
private final XContentFieldFilter filter;

private static final SourceFieldMapper DEFAULT = new SourceFieldMapper(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);

Expand Down Expand Up @@ -145,7 +137,9 @@ private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes)
this.includes = includes;
this.excludes = excludes;
final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null;
this.filter = enabled && filtered
? XContentFieldFilter.newFieldFilter(includes, excludes)
: (sourceBytes, contentType) -> sourceBytes;
this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);
}

Expand Down Expand Up @@ -180,18 +174,7 @@ public void preParse(DocumentParserContext context) throws IOException {
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
if (enabled && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
XContentType actualContentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
builder.close();
return bStream.bytes();
} else {
return originalSource;
}
return filter.apply(originalSource, contentType);
} else {
return null;
}
Expand Down