Skip to content

Commit

Permalink
Split search pipeline processor factories by type
Browse files Browse the repository at this point in the history
In the initial search pipelines commit, I threw request and response
processor factories into one combined map. I think that was a mistake.

We should embrace type-safety by making sure that the kind of processor
is clear from end to end. As we add more processor types (e.g. search
phase processor), throwing them all in one big map would get messier.

As a bonus, we'll be able to reuse processor names across different
types of processor.

Closes opensearch-project#7576

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed May 17, 2023
1 parent a1e42b1 commit 76e3245
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {
return request;
}

static class Factory implements Processor.Factory {
static class Factory implements Processor.Factory<SearchRequestProcessor> {
private final NamedXContentRegistry namedXContentRegistry;
public static final ParseField QUERY_FIELD = new ParseField("query");

Expand All @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory {

@Override
public FilterQueryRequestProcessor create(
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;

import java.util.Map;

Expand All @@ -25,7 +26,7 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi
public SearchPipelineCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
nodes.info: {}

- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } }
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugins;

import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Collections;
import java.util.Map;
Expand All @@ -20,13 +22,24 @@
*/
public interface SearchPipelinePlugin {
/**
* Returns additional search pipeline processor types added by this plugin.
* Returns additional search pipeline request processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}

/**
* Returns additional search pipeline response processor types added by this plugin.
*
* The key of the returned {@link Map} is the unique name for the processor which is specified
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}
33 changes: 6 additions & 27 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,21 @@ class Pipeline {
public static Pipeline create(
String id,
Map<String, Object> config,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(
SearchRequestProcessor.class,
processorFactories,
requestProcessorConfigs
);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(
SearchResponseProcessor.class,
processorFactories,
responseProcessorConfigs
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -98,10 +91,8 @@ public static Pipeline create(
return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry);
}

@SuppressWarnings("unchecked") // Cast is checked using isInstance
private static <T extends Processor> List<T> readProcessors(
Class<T> processorType,
Map<String, Processor.Factory> processorFactories,
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
) throws Exception {
List<T> processors = new ArrayList<>();
Expand All @@ -117,24 +108,12 @@ private static <T extends Processor> List<T> readProcessors(
Map<String, Object> config = (Map<String, Object>) entry.getValue();
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config);
if (processorType.isInstance(processor)) {
processors.add((T) processor);
} else {
throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName());
}
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
}
}
return processors;
}

List<Processor> flattenAllProcessors() {
List<Processor> allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size());
allProcessors.addAll(searchRequestProcessors);
allProcessors.addAll(searchResponseProcessors);
return allProcessors;
}

String getId() {
return id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory {
interface Factory<T extends Processor> {

/**
* Creates a processor based on the specified map of maps config.
Expand All @@ -65,8 +65,7 @@ interface Factory {
* <b>Note:</b> Implementations are responsible for removing the used configuration
* keys, so that after creation the config map should be empty.
*/
Processor create(Map<String, Factory> processorFactories, String tag, String description, Map<String, Object> config)
throws Exception;
T create(Map<String, Factory<T>> processorFactories, String tag, String description, Map<String, Object> config) throws Exception;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@

package org.opensearch.search.pipeline;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.ReportingService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

/**
Expand All @@ -26,45 +30,84 @@
*/
public class SearchPipelineInfo implements ReportingService.Info {

private final Set<ProcessorInfo> processors;
private final Map<String, Set<ProcessorInfo>> processors = new TreeMap<>();

public SearchPipelineInfo(List<ProcessorInfo> processors) {
this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order
public SearchPipelineInfo(Map<String, List<ProcessorInfo>> processors) {
for (Map.Entry<String, List<ProcessorInfo>> processorsEntry : processors.entrySet()) {
// we use a treeset here to have a test-able / predictable order
this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue()));
}
}

/**
* Read from a stream.
*/
public SearchPipelineInfo(StreamInput in) throws IOException {
processors = new TreeSet<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
processors.add(new ProcessorInfo(in));
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (in.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid
// request and response processor, since we couldn't tell the difference back then.
final int size = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int i = 0; i < size; i++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos);
processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos);
} else {
final int numTypes = in.readVInt();
for (int i = 0; i < numTypes; i++) {
String type = in.readString();
int numProcessors = in.readVInt();
Set<ProcessorInfo> processorInfos = new TreeSet<>();
for (int j = 0; j < numProcessors; j++) {
processorInfos.add(new ProcessorInfo(in));
}
processors.put(type, processorInfos);
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("search_pipelines");
builder.startArray("processors");
for (ProcessorInfo info : processors) {
info.toXContent(builder, params);
for (Map.Entry<String, Set<ProcessorInfo>> processorEntry : processors.entrySet()) {
builder.startArray(processorEntry.getKey());
for (ProcessorInfo info : processorEntry.getValue()) {
info.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.write(processors.size());
for (ProcessorInfo info : processors) {
info.writeTo(out);
// TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0)
if (out.getVersion().onOrBefore(Version.V_2_8_0)) {
// Prior to version 2.8, we grouped all processors into a single list.
Set<ProcessorInfo> processorInfos = new TreeSet<>();
processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet()));
processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet()));
out.writeVInt(processorInfos.size());
for (ProcessorInfo processorInfo : processorInfos) {
processorInfo.writeTo(out);
}
} else {
out.write(processors.size());
for (Map.Entry<String, Set<ProcessorInfo>> processorsEntry : processors.entrySet()) {
out.writeString(processorsEntry.getKey());
out.writeVInt(processorsEntry.getValue().size());
for (ProcessorInfo processorInfo : processorsEntry.getValue()) {
processorInfo.writeTo(out);
}
}
}
}

public boolean containsProcessor(String type) {
return processors.contains(new ProcessorInfo(type));
public boolean containsProcessor(String processorType, String type) {
return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type));
}

@Override
Expand Down
Loading

0 comments on commit 76e3245

Please sign in to comment.