Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial search pipelines implementation #6587

Merged
merged 18 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qa/smoke-test-multinode/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ File repo = file("$buildDir/testclusters/repo")
testClusters.integTest {
numberOfNodes = 2
setting 'path.repo', repo.absolutePath
setting 'opensearch.experimental.feature.search_pipeline.enabled', 'true'
}

integTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ
private final NamedWriteableRegistry namedWriteableRegistry;
private volatile ClusterState state;

private boolean forceEnabled = false;

public SearchPipelineService(
ClusterService clusterService,
ThreadPool threadPool,
Expand Down Expand Up @@ -212,7 +214,7 @@ public void putPipeline(
PutSearchPipelineRequest request,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE) == false) {
if (isFeatureEnabled() == false) {
throw new IllegalArgumentException("Experimental search pipeline feature is not enabled");
}

Expand Down Expand Up @@ -330,7 +332,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat

public SearchRequest transformRequest(SearchRequest originalRequest) {
String pipelineId = originalRequest.pipeline();
if (pipelineId != null && FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE)) {
if (pipelineId != null && isFeatureEnabled()) {
PipelineHolder pipeline = pipelines.get(pipelineId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking nit - if we want to keep the name of PipelineHolder , rename pipeline to pipelineholder to avoid invoke pipeline.pipeline

if (pipeline == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
Expand All @@ -353,7 +355,7 @@ public SearchRequest transformRequest(SearchRequest originalRequest) {

public SearchResponse transformResponse(SearchRequest request, SearchResponse searchResponse) {
String pipelineId = request.pipeline();
if (pipelineId != null && FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE)) {
if (pipelineId != null && isFeatureEnabled()) {
PipelineHolder pipeline = pipelines.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
Expand Down Expand Up @@ -426,4 +428,12 @@ static class PipelineHolder {
this.pipeline = Objects.requireNonNull(pipeline);
}
}

private boolean isFeatureEnabled() {
Copy link
Collaborator

@reta reta Apr 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msfroh since feature flags are on JVM level (they are not dynamic), I think we could use just simple enabled property that will be passed to constructor (as FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE) from core or manually during testing), wdyt?

return forceEnabled || FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE);
}

void setForceEnabled(boolean forceEnabled) {
this.forceEnabled = forceEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.Version;
Expand All @@ -29,11 +27,9 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.TermQueryBuilder;
Expand All @@ -58,17 +54,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@SuppressForbidden(reason = "feature flag overrides")
public class SearchPipelineServiceTests extends OpenSearchTestCase {
@BeforeClass
public static void enableFeature() {
System.setProperty(FeatureFlags.SEARCH_PIPELINE, "true");
}

@AfterClass
public static void disableFeature() {
System.setProperty(FeatureFlags.SEARCH_PIPELINE, "false");
}

private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() {
@Override
Expand Down Expand Up @@ -137,6 +123,7 @@ public void testExecuteSearchPipelineDoesNotExist() {
List.of(DUMMY_PLUGIN),
client
);
searchPipelineService.setForceEnabled(true);
final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar");
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
Expand Down Expand Up @@ -232,7 +219,7 @@ private SearchPipelineService createWithProcessors(Map<String, Processor.Factory
ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService();
when(threadPool.generic()).thenReturn(executorService);
when(threadPool.executor(anyString())).thenReturn(executorService);
return new SearchPipelineService(
SearchPipelineService searchPipelineService = new SearchPipelineService(
mock(ClusterService.class),
threadPool,
null,
Expand All @@ -248,6 +235,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
}),
client
);
searchPipelineService.setForceEnabled(true);
return searchPipelineService;
}

public void testUpdatePipelines() {
Expand Down