From deea8afa0b0e3088d2ed980d0a6d15b545ed46a8 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 17 May 2023 01:53:15 +0000 Subject: [PATCH] Split search pipeline processor factories by type In the initial search pipelines commit, I threw request and response processor factories into one combined map. I think that was a mistake. We should embrace type-safety by making sure that the kind of processor is clear from end to end. As we add more processor types (e.g. search phase processor), throwing them all in one big map would get messier. As a bonus, we'll be able to reuse processor names across different types of processor. Closes https://github.com/opensearch-project/OpenSearch/issues/7576 Signed-off-by: Michael Froh --- CHANGELOG.md | 1 + .../common/FilterQueryRequestProcessor.java | 4 +- .../SearchPipelineCommonModulePlugin.java | 16 ++-- .../test/search_pipeline/10_basic.yml | 4 +- .../plugins/SearchPipelinePlugin.java | 17 +++- .../opensearch/search/pipeline/Pipeline.java | 33 ++----- .../opensearch/search/pipeline/Processor.java | 5 +- .../search/pipeline/SearchPipelineInfo.java | 75 +++++++++++---- .../pipeline/SearchPipelineService.java | 93 ++++++++++++------- .../nodesinfo/NodeInfoStreamingTests.java | 2 +- .../pipeline/SearchPipelineServiceTests.java | 73 ++++++++------- 11 files changed, 196 insertions(+), 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14a74df481894..a0d5ba3d8d4f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) - [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) - [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377)) +- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 81c00012daec6..0ca090780bb60 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { return request; } - static class Factory implements Processor.Factory { + static class Factory implements Processor.Factory { private final NamedXContentRegistry namedXContentRegistry; public static final ParseField QUERY_FIELD = new ParseField("query"); @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory { @Override public FilterQueryRequestProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index a0e5182f71443..aa56714085b48 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -11,6 +11,8 @@ import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Map; @@ -25,12 +27,12 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi public SearchPipelineCommonModulePlugin() {} @Override - public Map getProcessors(Processor.Parameters parameters) { - return Map.of( - FilterQueryRequestProcessor.TYPE, - new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), - RenameFieldResponseProcessor.TYPE, - new RenameFieldResponseProcessor.Factory() - ); + public Map> getRequestProcessors(Processor.Parameters parameters) { + return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory()); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 0d931f8587664..644181d601ea4 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -12,5 +12,5 @@ nodes.info: {} - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } } + - contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } } + - contains: { nodes.$cluster_manager.search_pipelines.response_processors: { type: rename_field } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 8e6fbef6c8b1d..b8ceddecd3d20 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -9,6 +9,8 @@ package org.opensearch.plugins; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Collections; import java.util.Map; @@ -20,13 +22,24 @@ */ public interface SearchPipelinePlugin { /** - * Returns additional search pipeline processor types added by this plugin. + * Returns additional search pipeline request processor types added by this plugin. * * The key of the returned {@link Map} is the unique name for the processor which is specified * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map getProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } + + /** + * Returns additional search pipeline response processor types added by this plugin. + * + * The key of the returned {@link Map} is the unique name for the processor which is specified + * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} + * to create the processor from a given pipeline configuration. + */ + default Map> getResponseProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index f5dce8ec728b2..2e349ed7ef7d4 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -65,28 +65,21 @@ class Pipeline { public static Pipeline create( String id, Map config, - Map processorFactories, + Map> requestProcessorFactories, + Map> responseProcessorFactories, NamedWriteableRegistry namedWriteableRegistry ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors( - SearchRequestProcessor.class, - processorFactories, - requestProcessorConfigs - ); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors( - SearchResponseProcessor.class, - processorFactories, - responseProcessorConfigs - ); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -98,10 +91,8 @@ public static Pipeline create( return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); } - @SuppressWarnings("unchecked") // Cast is checked using isInstance private static List readProcessors( - Class processorType, - Map processorFactories, + Map> processorFactories, List> requestProcessorConfigs ) throws Exception { List processors = new ArrayList<>(); @@ -117,24 +108,12 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config); - if (processorType.isInstance(processor)) { - processors.add((T) processor); - } else { - throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName()); - } + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); } } return processors; } - List flattenAllProcessors() { - List allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size()); - allProcessors.addAll(searchRequestProcessors); - allProcessors.addAll(searchResponseProcessors); - return allProcessors; - } - String getId() { return id; } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index 44f268242b83c..ee28db1cc334d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -52,7 +52,7 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory { + interface Factory { /** * Creates a processor based on the specified map of maps config. @@ -65,8 +65,7 @@ interface Factory { * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. */ - Processor create(Map processorFactories, String tag, String description, Map config) - throws Exception; + T create(Map> processorFactories, String tag, String description, Map config) throws Exception; } /** diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java index 95d1e3720cbb3..b91f18c44cc7b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java @@ -8,15 +8,19 @@ package org.opensearch.search.pipeline; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.ReportingService; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -26,45 +30,84 @@ */ public class SearchPipelineInfo implements ReportingService.Info { - private final Set processors; + private final Map> processors = new TreeMap<>(); - public SearchPipelineInfo(List processors) { - this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order + public SearchPipelineInfo(Map> processors) { + for (Map.Entry> processorsEntry : processors.entrySet()) { + // we use a treeset here to have a test-able / predictable order + this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue())); + } } /** * Read from a stream. */ public SearchPipelineInfo(StreamInput in) throws IOException { - processors = new TreeSet<>(); - final int size = in.readVInt(); - for (int i = 0; i < size; i++) { - processors.add(new ProcessorInfo(in)); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (in.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid + // request and response processor, since we couldn't tell the difference back then. + final int size = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int i = 0; i < size; i++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos); + processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos); + } else { + final int numTypes = in.readVInt(); + for (int i = 0; i < numTypes; i++) { + String type = in.readString(); + int numProcessors = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int j = 0; j < numProcessors; j++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(type, processorInfos); + } } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("search_pipelines"); - builder.startArray("processors"); - for (ProcessorInfo info : processors) { - info.toXContent(builder, params); + for (Map.Entry> processorEntry : processors.entrySet()) { + builder.startArray(processorEntry.getKey()); + for (ProcessorInfo info : processorEntry.getValue()) { + info.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.write(processors.size()); - for (ProcessorInfo info : processors) { - info.writeTo(out); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (out.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we grouped all processors into a single list. + Set processorInfos = new TreeSet<>(); + processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet())); + processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet())); + out.writeVInt(processorInfos.size()); + for (ProcessorInfo processorInfo : processorInfos) { + processorInfo.writeTo(out); + } + } else { + out.write(processors.size()); + for (Map.Entry> processorsEntry : processors.entrySet()) { + out.writeString(processorsEntry.getKey()); + out.writeVInt(processorsEntry.getValue().size()); + for (ProcessorInfo processorInfo : processorsEntry.getValue()) { + processorInfo.writeTo(out); + } + } } } - public boolean containsProcessor(String type) { - return processors.contains(new ProcessorInfo(type)); + public boolean containsProcessor(String processorType, String type) { + return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type)); } @Override diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index f96a6eb4a6b76..a486e636cbb7d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines @@ -68,7 +70,8 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private static final Logger logger = LogManager.getLogger(SearchPipelineService.class); private final ClusterService clusterService; private final ScriptService scriptService; - private final Map processorFactories; + private final Map> requestProcessorFactories; + private final Map> responseProcessorFactories; private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final List> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>(); @@ -95,34 +98,33 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - this.processorFactories = processorFactories( - searchPipelinePlugins, - new Processor.Parameters( - env, - scriptService, - analysisRegistry, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), - this, - client, - threadPool.generic()::execute, - namedXContentRegistry - ) + Processor.Parameters parameters = new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + namedXContentRegistry ); + this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters)); + this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters)); putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); this.isEnabled = isEnabled; } - private static Map processorFactories( + private static Map> processorFactories( List searchPipelinePlugins, - Processor.Parameters parameters + Function>> processorLoader ) { - Map processorFactories = new HashMap<>(); + Map> processorFactories = new HashMap<>(); for (SearchPipelinePlugin searchPipelinePlugin : searchPipelinePlugins) { - Map newProcessors = searchPipelinePlugin.getProcessors(parameters); - for (Map.Entry entry : newProcessors.entrySet()) { + Map> newProcessors = processorLoader.apply(searchPipelinePlugin); + for (Map.Entry> entry : newProcessors.entrySet()) { if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Search processor [" + entry.getKey() + "] is already registered"); } @@ -173,7 +175,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), newConfiguration.getConfigAsMap(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -268,12 +271,27 @@ void validatePipeline(Map searchPipelineInfos throw new IllegalStateException("Search pipeline info is empty"); } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, namedWriteableRegistry); + Pipeline pipeline = Pipeline.create( + request.getId(), + pipelineConfig, + requestProcessorFactories, + responseProcessorFactories, + namedWriteableRegistry + ); List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { + for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { + for (Map.Entry entry : searchPipelineInfos.entrySet()) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, type) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); + } + } + } + for (SearchResponseProcessor processor : pipeline.getSearchResponseProcessors()) { for (Map.Entry entry : searchPipelineInfos.entrySet()) { String type = processor.getType(); - if (entry.getValue().containsProcessor(type) == false) { + if (entry.getValue().containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); } @@ -352,7 +370,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = Pipeline.create( AD_HOC_PIPELINE_ID, searchRequest.source().searchPipelineSource(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); } catch (Exception e) { @@ -385,17 +404,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce return new PipelinedRequest(pipeline, transformedRequest); } - Map getProcessorFactories() { - return processorFactories; + Map> getRequestProcessorFactories() { + return requestProcessorFactories; + } + + Map> getResponseProcessorFactories() { + return responseProcessorFactories; } @Override public SearchPipelineInfo info() { - List processorInfoList = new ArrayList<>(); - for (Map.Entry entry : processorFactories.entrySet()) { - processorInfoList.add(new ProcessorInfo(entry.getKey())); - } - return new SearchPipelineInfo(processorInfoList); + List requestProcessorInfoList = requestProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + List responseProcessorInfoList = responseProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + return new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, requestProcessorInfoList, Pipeline.RESPONSE_PROCESSORS_KEY, responseProcessorInfoList) + ); } public static List getPipelines(ClusterState clusterState, String... ids) { diff --git a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java index bec921bc5bf5d..cdd1c682b40dc 100644 --- a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java @@ -251,7 +251,7 @@ private static NodeInfo createNodeInfo() { for (int i = 0; i < numProcessors; i++) { processors.add(new org.opensearch.search.pipeline.ProcessorInfo(randomAlphaOfLengthBetween(3, 10))); } - searchPipelineInfo = new SearchPipelineInfo(processors); + searchPipelineInfo = new SearchPipelineInfo(Map.of(randomAlphaOfLengthBetween(3, 10), processors)); } return new NodeInfo( diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 36978d5310810..516227e9a13d8 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -60,9 +60,13 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Processor.Parameters parameters) { return Map.of("foo", (factories, tag, description, config) -> null); } + + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of("bar", (factories, tag, description, config) -> null); + } }; private ThreadPool threadPool; @@ -89,9 +93,14 @@ public void testSearchPipelinePlugin() { client, false ); - Map factories = searchPipelineService.getProcessorFactories(); - assertEquals(1, factories.size()); - assertTrue(factories.containsKey("foo")); + Map> requestProcessorFactories = searchPipelineService + .getRequestProcessorFactories(); + assertEquals(1, requestProcessorFactories.size()); + assertTrue(requestProcessorFactories.containsKey("foo")); + Map> responseProcessorFactories = searchPipelineService + .getResponseProcessorFactories(); + assertEquals(1, responseProcessorFactories.size()); + assertTrue(responseProcessorFactories.containsKey("bar")); } public void testSearchPipelinePluginDuplicate() { @@ -235,8 +244,8 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp } private SearchPipelineService createWithProcessors() { - Map processors = new HashMap<>(); - processors.put("scale_request_size", (processorFactories, tag, description, config) -> { + Map> requestProcessors = new HashMap<>(); + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -245,11 +254,12 @@ private SearchPipelineService createWithProcessors() { req -> req.source().size((int) (req.source().size() * scale)) ); }); - processors.put("fixed_score", (processorFactories, tag, description, config) -> { + Map> responseProcessors = new HashMap<>(); + responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); - return createWithProcessors(processors); + return createWithProcessors(requestProcessors, responseProcessors); } @Override @@ -258,7 +268,10 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(searchModule.getNamedWriteables()); } - private SearchPipelineService createWithProcessors(Map processors) { + private SearchPipelineService createWithProcessors( + Map> requestProcessors, + Map> responseProcessors + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -274,8 +287,13 @@ private SearchPipelineService createWithProcessors(Map getProcessors(Processor.Parameters parameters) { - return processors; + public Map> getRequestProcessors(Processor.Parameters parameters) { + return requestProcessors; + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return responseProcessors; } }), client, @@ -619,13 +637,14 @@ public void testValidatePipeline() throws Exception { XContentType.JSON ); + SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor)) + ); + SearchPipelineInfo incompletePipelineInfo = new SearchPipelineInfo(Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor))); // One node is missing a processor expectThrows( OpenSearchParseException.class, - () -> searchPipelineService.validatePipeline( - Map.of(n1, new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), n2, new SearchPipelineInfo(List.of(reqProcessor))), - putRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, incompletePipelineInfo), putRequest) ); // Discovery failed, no infos passed. @@ -644,27 +663,11 @@ public void testValidatePipeline() throws Exception { ); expectThrows( ClassCastException.class, - () -> searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - badPutRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), badPutRequest) ); // Success - searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - putRequest - ); + searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), putRequest); } /** @@ -717,7 +720,7 @@ public void testInlinePipeline() throws Exception { public void testInfo() { SearchPipelineService searchPipelineService = createWithProcessors(); SearchPipelineInfo info = searchPipelineService.info(); - assertTrue(info.containsProcessor("scale_request_size")); - assertTrue(info.containsProcessor("fixed_score")); + assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); + assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } }