Skip to content

Commit

Permalink
add rejection listener unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Aug 26, 2024
1 parent 59046ba commit b583b61
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,6 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
listener
);
}

if (task instanceof QueryGroupTask) {
listener =
}

executeRequest(task, searchRequest, this::searchAsyncAction, listener);
}

Expand Down
13 changes: 9 additions & 4 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

package org.opensearch.wlm;

import java.util.Optional;

/**
* This is stub at this point in time
* This is stub at this point in time and will be replace by an acutal one in couple of days
*/
public class QueryGroupService {
/**
Expand All @@ -25,8 +27,11 @@ public void requestFailedFor(final String queryGroupId) {
* @param queryGroupId query group identifier
* @return whether the queryGroup is contended and should reject new incoming requests
*/
public boolean shouldRejectFor(String queryGroupId) {
if (queryGroupId == null) return false;
return false;
public Optional<String> shouldRejectFor(String queryGroupId) {
if (queryGroupId == null) return Optional.empty();
// TODO: At this point this is dummy and we need to decide whether to cancel the request based on last
// reported resource usage for the queryGroup. We also need to increment the rejection count here for the
// query group
return Optional.of("Possible reason. ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.Optional;

/**
* This listener is used to perform the rejections for incoming requests into a queryGroup
*/
Expand All @@ -35,8 +37,9 @@ public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupS
@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
if (queryGroupService.shouldRejectFor(queryGroupId)) {
throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended.");
Optional<String> reason = queryGroupService.shouldRejectFor(queryGroupId);
if (reason.isPresent()) {
throw new OpenSearchRejectedExecutionException("QueryGroup " + queryGroupId + " is already contended." + reason.get());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.listeners;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.Optional;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class QueryGroupRequestRejectionOperationListenerTests extends OpenSearchTestCase {
ThreadPool testThreadPool;
QueryGroupService queryGroupService;
QueryGroupRequestRejectionOperationListener sut;


public void setUp() throws Exception {
super.setUp();
testThreadPool = new TestThreadPool("RejectionTestThreadPool");
}

public void tearDown() throws Exception {
super.tearDown();
testThreadPool.shutdown();
}

public void testRejectionCase() {
queryGroupService = mock(QueryGroupService.class);
sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool);
final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t";
testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId);
when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.of("Test query group is contended"));

assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.onRequestStart(null));
}

public void testNonRejectionCase() {
queryGroupService = mock(QueryGroupService.class);
sut = new QueryGroupRequestRejectionOperationListener(queryGroupService, testThreadPool);
final String testQueryGroupId = "asdgasgkajgkw3141_3rt4t";
testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, testQueryGroupId);
when(queryGroupService.shouldRejectFor(testQueryGroupId)).thenReturn(Optional.empty());

sut.onRequestStart(null);
}
}

0 comments on commit b583b61

Please sign in to comment.