Skip to content

Commit

Permalink
Initial search pipelines implementation (#6587)
Browse files Browse the repository at this point in the history
* Initial search pipelines implementation

This commit includes the basic features of search pipelines
(see opensearch-project/search-processor#80).

Search pipelines are modeled after ingest pipelines and provide a
simple, clean API for components to modify search requests and
responses.

With this commit we can:

1. Can create, retrieve, update, and delete search pipelines.
2. Transform search requests and responses by explicitly referencing a
   pipeline.

Later work will include:

1. Adding an index setting to specify a default search pipeline.
2. Allowing search pipelines to be defined within a search request (for
   development/testing purposes, akin to simulating an ingest
   pipeline).
3. Adding a collection of search pipeline processors to support common
   useful transformations. (Suggestions welcome!)

Signed-off-by: Michael Froh <froh@amazon.com>

* Incorporate feedback from @reta and @navneet1v

1. SearchPipelinesClient: JavaDoc fix
2. SearchRequest: Check versions when (de)serializing new "pipeline"
   property.
3. Rename SearchPipelinesPlugin -> SearchPipelinePlugin.
4. Pipeline: Change visibility to package private
5. SearchPipelineProcessingException: New exception type to wrap
   exceptions thrown when executing a pipeline.

Bonus: Added an integration test for filter_query request processor.

Signed-off-by: Michael Froh <froh@amazon.com>

* Register SearchPipelineProcessingException

Also added more useful messages to unit tests to explicitly explain
what hoops need to be jumped through in order to add a new serializable
exception.

Signed-off-by: Michael Froh <froh@amazon.com>

* Remove unneeded dependencies from search-pipeline-common

I had copied some dependencies from ingest-common, but they are not used
by search-pipeline-common (yet).

Signed-off-by: Michael Froh <froh@amazon.com>

* Avoid cloning SearchRequest if no SearchRequestProcessors

Also, add tests to confirm that a pipeline with no processors works
fine (as a no-op).

Signed-off-by: Michael Froh <froh@amazon.com>

* Use NamedWritableRegistry to deserialize SearchRequest

Queries are serialized as NamedWritables, so we need to use a
NamedWritableRegistry to deserialize.

Signed-off-by: Michael Froh <froh@amazon.com>

* Check for empty pipeline with CollectionUtils.isEmpty

Signed-off-by: Michael Froh <froh@amazon.com>

* Update server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java

Co-authored-by: Navneet Verma <vermanavneet003@gmail.com>
Signed-off-by: Michael Froh <froh@amazon.com>

* Incorporate feedback from @noCharger

Signed-off-by: Michael Froh <froh@amazon.com>

* Incorporate feedback from @reta

- Renamed various classes from "SearchPipelinesSomething" to
"SearchPipelineSomething" to be consistent.
- Refactored NodeInfo construction in NodeService to avoid ternary
  operator and improved readability.

Signed-off-by: Michael Froh <froh@amazon.com>

* Gate search pipelines behind a feature flag

Also renamed SearchPipelinesRequestConverters.

Signed-off-by: Michael Froh <froh@amazon.com>

* More feature flag fixes for search pipeline testing

- Don't use system properties for SearchPipelineServiceTests.
- Enable feature flag for multinode smoke tests.

Signed-off-by: Michael Froh <froh@amazon.com>

* Move feature flag into constructor parameter

Thanks for the suggestion, @reta!

Signed-off-by: Michael Froh <froh@amazon.com>

* Move REST handlers behind feature flag

Signed-off-by: Michael Froh <froh@amazon.com>

---------

Signed-off-by: Michael Froh <froh@amazon.com>
Co-authored-by: Navneet Verma <vermanavneet003@gmail.com>
(cherry picked from commit ee990bd)
  • Loading branch information
msfroh committed Apr 10, 2023
1 parent 932f47b commit a6c16d5
Show file tree
Hide file tree
Showing 77 changed files with 4,432 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add initial search pipelines ([#6587](https://github.com/opensearch-project/OpenSearch/pull/6587))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- Add wait_for_completion parameter to resize, open, and forcemerge APIs ([#6434](https://github.com/opensearch-project/OpenSearch/pull/6434))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
Expand Down
3 changes: 3 additions & 0 deletions client/rest-high-level/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ testClusters.all {
extraConfigFile nodeCert.name, nodeCert
extraConfigFile nodeTrustStore.name, nodeTrustStore
extraConfigFile pkiTrustCert.name, pkiTrustCert

// Enable APIs behind feature flags
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable {
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final SearchPipelineClient searchPipelineClient = new SearchPipelineClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -354,6 +355,10 @@ public final TasksClient tasks() {
return tasksClient;
}

public final SearchPipelineClient searchPipeline() {
return searchPipelineClient;
}

/**
* Executes a bulk request using the Bulk API.
* @param bulkRequest the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.emptySet;

public final class SearchPipelineClient {
private final RestHighLevelClient restHighLevelClient;

SearchPipelineClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse put(PutSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable putAsync(PutSearchPipelineRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetSearchPipelineResponse get(GetSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
Collections.singleton(404)
);
}

/**
* Asynchronously get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getAsync(
GetSearchPipelineRequest request,
RequestOptions options,
ActionListener<GetSearchPipelineResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
listener,
Collections.singleton(404)
);
}

/**
* Delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse delete(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelineRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deleteAsync(
DeleteSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelineRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;

import java.io.IOException;

final class SearchPipelineRequestConverters {
private SearchPipelineRequestConverters() {}

static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
params.withTimeout(putPipelineRequest.timeout());
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(params.asMap());
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}

static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

public class SearchPipelineClientIT extends OpenSearchRestHighLevelClientTestCase {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

private static void createPipeline(PutSearchPipelineRequest request) throws IOException {
AcknowledgedResponse response = execute(
request,
highLevelClient().searchPipeline()::put,
highLevelClient().searchPipeline()::putAsync
);
assertTrue(response.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id);
GetSearchPipelineResponse response = execute(
getRequest,
highLevelClient().searchPipeline()::get,
highLevelClient().searchPipeline()::getAsync
);
assertTrue(response.isFound());
assertEquals(1, response.pipelines().size());
assertEquals(id, response.pipelines().get(0).getId());
}

public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id);
AcknowledgedResponse response = execute(
deleteRequest,
highLevelClient().searchPipeline()::delete,
highLevelClient().searchPipeline()::deleteAsync
);
assertTrue(response.isAcknowledged());
}

private static XContentBuilder buildSearchPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildSearchPipeline(pipelineBuilder);
}

private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("description", "a pipeline description");
builder.startArray("request_processors");
{
builder.startObject().startObject("filter_query");
{
builder.startObject("query");
{
builder.startObject("term");
{
builder.field("field", "value");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject().endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
}
5 changes: 5 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ ${path.logs}
# the core.
#
#opensearch.experimental.feature.extensions.enabled: false
#
#
# Gates the search pipeline feature. This feature enables configurable processors
# for search requests and search responses, similar to ingest pipelines.
#opensearch.experimental.feature.search_pipeline.enabled: false
Loading

0 comments on commit a6c16d5

Please sign in to comment.