From c9a25008858e8483d38b5cd179314e8b2c99f6f6 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Fri, 9 Aug 2024 13:54:55 +0800 Subject: [PATCH] Add UT for listenable transport request handler Signed-off-by: zane-neo --- CHANGELOG.md | 3 +- .../nodes/TransportNodesActionTests.java | 71 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56b1e7b4ffba3..cb7b7a9c40eac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] Add QueryGroup Stats API Logic ([15777](https://github.com/opensearch-project/OpenSearch/pull/15777)) - Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424)) - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) -- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967), [#16110](https://github.com/opensearch-project/OpenSearch/pull/16110)) +- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967)) - [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047)) - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641)) - Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993)) - Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383)) +- Add listenable TransportRequestHandler in TransportNodesAction ([#15166](https://github.com/opensearch-project/OpenSearch/pull/15166)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java index a338e68276bbc..c755ff2307f13 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java @@ -43,16 +43,19 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.indices.IndicesService; import org.opensearch.node.NodeService; +import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.After; @@ -74,9 +77,12 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import org.mockito.ArgumentCaptor; + import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class TransportNodesActionTests extends OpenSearchTestCase { @@ -198,6 +204,28 @@ public void testTransportNodesActionWithDiscoveryNodesReset() { capturedTransportNodeRequestList.forEach(capturedRequest -> assertNull(capturedRequest.testNodesRequest.concreteNodes())); } + public void testCreateTransportNodesActionWithListenableHandler() { + TransportNodesAction action = getListenableHandlerTestTransportNodesAction(); + assertTrue( + transport.getRequestHandlers() + .getHandler(action.actionName + "[n]") + .getHandler() instanceof TransportNodesAction.ListenableNodeTransportHandler + ); + } + + public void testMessageReceivedInListenableNodeTransportHandler() throws Exception { + TransportNodesAction action = getListenableHandlerTestTransportNodesAction(); + TransportChannel transportChannel = mock(TransportChannel.class); + transport.getRequestHandlers() + .getHandler(action.actionName + "[n]") + .getHandler() + .messageReceived(new TestNodeRequest(), transportChannel, mock(Task.class)); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(TestNodeResponse.class); + verify(transportChannel).sendResponse(argCaptor.capture()); + TestNodeResponse response = argCaptor.getValue(); + assertNotNull(response); + } + private List mockList(Supplier supplier, int size) { List failures = new ArrayList<>(size); for (int i = 0; i < size; ++i) { @@ -290,6 +318,19 @@ public TestTransportNodesAction getTestTransportNodesAction() { ); } + public TestTransportNodesAction getListenableHandlerTestTransportNodesAction() { + return new TestTransportNodesAction( + THREAD_POOL, + clusterService, + transportService, + new ActionFilters(Collections.emptySet()), + TestNodesRequest::new, + TestNodeRequest::new, + ThreadPool.Names.SAME, + true + ); + } + public DataNodesOnlyTransportNodesAction getDataNodesOnlyTransportNodesAction(TransportService transportService) { return new DataNodesOnlyTransportNodesAction( THREAD_POOL, @@ -335,6 +376,31 @@ private static class TestTransportNodesAction extends TransportNodesAction< ); } + TestTransportNodesAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader nodeRequest, + String nodeExecutor, + boolean listenableHandler + ) { + super( + "indices:admin/test", + threadPool, + clusterService, + transportService, + actionFilters, + request, + nodeRequest, + nodeExecutor, + nodeExecutor, + listenableHandler, + TestNodeResponse.class + ); + } + @Override protected TestNodesResponse newResponse( TestNodesRequest request, @@ -359,6 +425,11 @@ protected TestNodeResponse nodeOperation(TestNodeRequest request) { return new TestNodeResponse(); } + @Override + protected void nodeOperation(TestNodeRequest request, ActionListener actionListener) { + actionListener.onResponse(new TestNodeResponse()); + } + } private static class DataNodesOnlyTransportNodesAction extends TestTransportNodesAction {