forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial search pipelines implementation (opensearch-project#6587)
* Initial search pipelines implementation This commit includes the basic features of search pipelines (see opensearch-project/search-processor#80). Search pipelines are modeled after ingest pipelines and provide a simple, clean API for components to modify search requests and responses. With this commit we can: 1. Can create, retrieve, update, and delete search pipelines. 2. Transform search requests and responses by explicitly referencing a pipeline. Later work will include: 1. Adding an index setting to specify a default search pipeline. 2. Allowing search pipelines to be defined within a search request (for development/testing purposes, akin to simulating an ingest pipeline). 3. Adding a collection of search pipeline processors to support common useful transformations. (Suggestions welcome!) Signed-off-by: Michael Froh <froh@amazon.com> * Incorporate feedback from @reta and @navneet1v 1. SearchPipelinesClient: JavaDoc fix 2. SearchRequest: Check versions when (de)serializing new "pipeline" property. 3. Rename SearchPipelinesPlugin -> SearchPipelinePlugin. 4. Pipeline: Change visibility to package private 5. SearchPipelineProcessingException: New exception type to wrap exceptions thrown when executing a pipeline. Bonus: Added an integration test for filter_query request processor. Signed-off-by: Michael Froh <froh@amazon.com> * Register SearchPipelineProcessingException Also added more useful messages to unit tests to explicitly explain what hoops need to be jumped through in order to add a new serializable exception. Signed-off-by: Michael Froh <froh@amazon.com> * Remove unneeded dependencies from search-pipeline-common I had copied some dependencies from ingest-common, but they are not used by search-pipeline-common (yet). Signed-off-by: Michael Froh <froh@amazon.com> * Avoid cloning SearchRequest if no SearchRequestProcessors Also, add tests to confirm that a pipeline with no processors works fine (as a no-op). Signed-off-by: Michael Froh <froh@amazon.com> * Use NamedWritableRegistry to deserialize SearchRequest Queries are serialized as NamedWritables, so we need to use a NamedWritableRegistry to deserialize. Signed-off-by: Michael Froh <froh@amazon.com> * Check for empty pipeline with CollectionUtils.isEmpty Signed-off-by: Michael Froh <froh@amazon.com> * Update server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java Co-authored-by: Navneet Verma <vermanavneet003@gmail.com> Signed-off-by: Michael Froh <froh@amazon.com> * Incorporate feedback from @noCharger Signed-off-by: Michael Froh <froh@amazon.com> * Incorporate feedback from @reta - Renamed various classes from "SearchPipelinesSomething" to "SearchPipelineSomething" to be consistent. - Refactored NodeInfo construction in NodeService to avoid ternary operator and improved readability. Signed-off-by: Michael Froh <froh@amazon.com> * Gate search pipelines behind a feature flag Also renamed SearchPipelinesRequestConverters. Signed-off-by: Michael Froh <froh@amazon.com> * More feature flag fixes for search pipeline testing - Don't use system properties for SearchPipelineServiceTests. - Enable feature flag for multinode smoke tests. Signed-off-by: Michael Froh <froh@amazon.com> * Move feature flag into constructor parameter Thanks for the suggestion, @reta! Signed-off-by: Michael Froh <froh@amazon.com> * Move REST handlers behind feature flag Signed-off-by: Michael Froh <froh@amazon.com> --------- Signed-off-by: Michael Froh <froh@amazon.com> Co-authored-by: Navneet Verma <vermanavneet003@gmail.com>
- Loading branch information
1 parent
d59159d
commit 2c57e65
Showing
77 changed files
with
4,431 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
149 changes: 149 additions & 0 deletions
149
client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelineClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
|
||
import static java.util.Collections.emptySet; | ||
|
||
public final class SearchPipelineClient { | ||
private final RestHighLevelClient restHighLevelClient; | ||
|
||
SearchPipelineClient(RestHighLevelClient restHighLevelClient) { | ||
this.restHighLevelClient = restHighLevelClient; | ||
} | ||
|
||
/** | ||
* Add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse put(PutSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable putAsync(PutSearchPipelineRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Get existing pipelines. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public GetSearchPipelineResponse get(GetSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously get existing pipelines. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable getAsync( | ||
GetSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<GetSearchPipelineResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
listener, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse delete(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable deleteAsync( | ||
DeleteSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelineRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
} |
61 changes: 61 additions & 0 deletions
61
.../rest-high-level/src/main/java/org/opensearch/client/SearchPipelineRequestConverters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.apache.hc.client5.http.classic.methods.HttpDelete; | ||
import org.apache.hc.client5.http.classic.methods.HttpGet; | ||
import org.apache.hc.client5.http.classic.methods.HttpPut; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
|
||
import java.io.IOException; | ||
|
||
final class SearchPipelineRequestConverters { | ||
private SearchPipelineRequestConverters() {} | ||
|
||
static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(putPipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpPut.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params params = new RequestConverters.Params(); | ||
params.withTimeout(putPipelineRequest.timeout()); | ||
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(params.asMap()); | ||
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); | ||
return request; | ||
} | ||
|
||
static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(deletePipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withTimeout(deletePipelineRequest.timeout()); | ||
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
|
||
static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addCommaSeparatedPathParts(getPipelineRequest.getIds()) | ||
.build(); | ||
Request request = new Request(HttpGet.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
} |
115 changes: 115 additions & 0 deletions
115
client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelineClientIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.bytes.BytesReference; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
|
||
public class SearchPipelineClientIT extends OpenSearchRestHighLevelClientTestCase { | ||
|
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
} | ||
|
||
private static void createPipeline(PutSearchPipelineRequest request) throws IOException { | ||
AcknowledgedResponse response = execute( | ||
request, | ||
highLevelClient().searchPipeline()::put, | ||
highLevelClient().searchPipeline()::putAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
public void testGetPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id); | ||
GetSearchPipelineResponse response = execute( | ||
getRequest, | ||
highLevelClient().searchPipeline()::get, | ||
highLevelClient().searchPipeline()::getAsync | ||
); | ||
assertTrue(response.isFound()); | ||
assertEquals(1, response.pipelines().size()); | ||
assertEquals(id, response.pipelines().get(0).getId()); | ||
} | ||
|
||
public void testDeletePipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id); | ||
AcknowledgedResponse response = execute( | ||
deleteRequest, | ||
highLevelClient().searchPipeline()::delete, | ||
highLevelClient().searchPipeline()::deleteAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline() throws IOException { | ||
XContentType xContentType = randomFrom(XContentType.values()); | ||
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); | ||
return buildSearchPipeline(pipelineBuilder); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException { | ||
builder.startObject(); | ||
{ | ||
builder.field("description", "a pipeline description"); | ||
builder.startArray("request_processors"); | ||
{ | ||
builder.startObject().startObject("filter_query"); | ||
{ | ||
builder.startObject("query"); | ||
{ | ||
builder.startObject("term"); | ||
{ | ||
builder.field("field", "value"); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject().endObject(); | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.