Skip to content

Commit

Permalink
[Search Pipelines] Split search pipeline processor factories by type (o…
Browse files Browse the repository at this point in the history
…pensearch-project#7597)

In the initial search pipelines commit, I threw request and response
processor factories into one combined map. I think that was a mistake.

We should embrace type-safety by making sure that the kind of processor
is clear from end to end. As we add more processor types (e.g. search
phase processor), throwing them all in one big map would get messier.

As a bonus, we'll be able to reuse processor names across different
types of processor.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh authored and stephen-crawford committed May 31, 2023
1 parent 8474a3f commit 98f6039
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 132 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
return request;
}

static class Factory implements Processor.Factory {
static class Factory implements Processor.Factory<SearchRequestProcessor> {
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Expand All @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory {

@Override
public FilterQueryRequestProcessor create(
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
/**
* This is a factor that creates the RenameResponseProcessor
*/
public static final class Factory implements Processor.Factory {
public static final class Factory implements Processor.Factory<SearchResponseProcessor> {

/**
* Constructor for factory
Expand All @@ -137,7 +137,7 @@ public static final class Factory implements Processor.Factory {

@Override
public RenameFieldResponseProcessor create(
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Map;

Expand All @@ -25,12 +27,12 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi
public SearchPipelineCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory()
);
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
nodes.info: {}

- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } }
- contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.response_processors: { type: rename_field } }
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugins;

import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Collections;
import java.util.Map;
Expand All @@ -20,13 +22,24 @@
*/
public interface SearchPipelinePlugin {
/**
* Returns additional search pipeline processor types added by this plugin.
* Returns additional search pipeline request processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}

/**
* Returns additional search pipeline response processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}
39 changes: 9 additions & 30 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Pipeline {

private final NamedWriteableRegistry namedWriteableRegistry;

Pipeline(
private Pipeline(
String id,
@Nullable String description,
@Nullable Integer version,
Expand All @@ -62,31 +62,24 @@ class Pipeline {
this.namedWriteableRegistry = namedWriteableRegistry;
}

public static Pipeline create(
static Pipeline create(
String id,
Map<String, Object> config,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(
SearchRequestProcessor.class,
processorFactories,
requestProcessorConfigs
);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(
SearchResponseProcessor.class,
processorFactories,
responseProcessorConfigs
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -98,10 +91,8 @@ public static Pipeline create(
return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry);
}

@SuppressWarnings("unchecked") // Cast is checked using isInstance
private static <T extends Processor> List<T> readProcessors(
Class<T> processorType,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
) throws Exception {
List<T> processors = new ArrayList<>();
Expand All @@ -117,22 +108,10 @@ private static <T extends Processor> List<T> readProcessors(
Map<String, Object> config = (Map<String, Object>) entry.getValue();
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config);
if (processorType.isInstance(processor)) {
processors.add((T) processor);
} else {
throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName());
}
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
}
}
return processors;
}

List<Processor> flattenAllProcessors() {
List<Processor> allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size());
allProcessors.addAll(searchRequestProcessors);
allProcessors.addAll(searchResponseProcessors);
return allProcessors;
return Collections.unmodifiableList(processors);
}

String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory {
interface Factory<T extends Processor> {

/**
* Creates a processor based on the specified map of maps config.
Expand All @@ -65,8 +65,7 @@ interface Factory {
* <b>Note:</b> Implementations are responsible for removing the used configuration
* keys, so that after creation the config map should be empty.
*/
Processor create(Map<String, Factory> processorFactories, String tag, String description, Map<String, Object> config)
throws Exception;
T create(Map<String, Factory<T>> processorFactories, String tag, String description, Map<String, Object> config) throws Exception;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@

package org.opensearch.search.pipeline;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.ReportingService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

/**
Expand All @@ -26,45 +30,84 @@
*/
public class SearchPipelineInfo implements ReportingService.Info {

private final Set<ProcessorInfo> processors;
private final Map<String, Set<ProcessorInfo>> processors = new TreeMap<>();

public SearchPipelineInfo(List<ProcessorInfo> processors) {
this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order
public SearchPipelineInfo(Map<String, List<ProcessorInfo>> processors) {
for (Map.Entry<String, List<ProcessorInfo>> processorsEntry : processors.entrySet()) {
// we use a treeset here to have a test-able / predictable order
this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue()));
}
}

/**
* Read from a stream.
*/
public SearchPipelineInfo(StreamInput in) throws IOException {
processors = new TreeSet<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
processors.add(new ProcessorInfo(in));
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (in.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid
// request and response processor, since we couldn't tell the difference back then.
final int size = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int i = 0; i < size; i++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos);
processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos);
} else {
final int numTypes = in.readVInt();
for (int i = 0; i < numTypes; i++) {
String type = in.readString();
int numProcessors = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int j = 0; j < numProcessors; j++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(type, processorInfos);
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("search_pipelines");
builder.startArray("processors");
for (ProcessorInfo info : processors) {
info.toXContent(builder, params);
for (Map.Entry<String, Set<ProcessorInfo>> processorEntry : processors.entrySet()) {
builder.startArray(processorEntry.getKey());
for (ProcessorInfo info : processorEntry.getValue()) {
info.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.write(processors.size());
for (ProcessorInfo info : processors) {
info.writeTo(out);
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (out.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we grouped all processors into a single list.
Set<ProcessorInfo> processorInfos = new TreeSet<>();
processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet()));
processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet()));
out.writeVInt(processorInfos.size());
for (ProcessorInfo processorInfo : processorInfos) {
processorInfo.writeTo(out);
}
} else {
out.write(processors.size());
for (Map.Entry<String, Set<ProcessorInfo>> processorsEntry : processors.entrySet()) {
out.writeString(processorsEntry.getKey());
out.writeVInt(processorsEntry.getValue().size());
for (ProcessorInfo processorInfo : processorsEntry.getValue()) {
processorInfo.writeTo(out);
}
}
}
}

public boolean containsProcessor(String type) {
return processors.contains(new ProcessorInfo(type));
public boolean containsProcessor(String processorType, String type) {
return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type));
}

@Override
Expand Down
Loading

0 comments on commit 98f6039

Please sign in to comment.