From a11a8a1a341366b170589984e9d490955b22aa77 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Fri, 9 Aug 2024 13:54:55 +0800 Subject: [PATCH] Add UT for listenable transport request handler Signed-off-by: zane-neo --- CHANGELOG.md | 1 + .../nodes/TransportNodesActionTests.java | 71 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b76a3d50cb0d..eee5f91b89415 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391) - Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707)) +- Add listenable TransportRequestHandler in TransportNodesAction ([#15166](https://github.com/opensearch-project/OpenSearch/pull/15166)) ### Deprecated - Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712)) diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java index a338e68276bbc..c755ff2307f13 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java @@ -43,16 +43,19 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.indices.IndicesService; import org.opensearch.node.NodeService; +import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.After; @@ -74,9 +77,12 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import org.mockito.ArgumentCaptor; + import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class TransportNodesActionTests extends OpenSearchTestCase { @@ -198,6 +204,28 @@ public void testTransportNodesActionWithDiscoveryNodesReset() { capturedTransportNodeRequestList.forEach(capturedRequest -> assertNull(capturedRequest.testNodesRequest.concreteNodes())); } + public void testCreateTransportNodesActionWithListenableHandler() { + TransportNodesAction action = getListenableHandlerTestTransportNodesAction(); + assertTrue( + transport.getRequestHandlers() + .getHandler(action.actionName + "[n]") + .getHandler() instanceof TransportNodesAction.ListenableNodeTransportHandler + ); + } + + public void testMessageReceivedInListenableNodeTransportHandler() throws Exception { + TransportNodesAction action = getListenableHandlerTestTransportNodesAction(); + TransportChannel transportChannel = mock(TransportChannel.class); + transport.getRequestHandlers() + .getHandler(action.actionName + "[n]") + .getHandler() + .messageReceived(new TestNodeRequest(), transportChannel, mock(Task.class)); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(TestNodeResponse.class); + verify(transportChannel).sendResponse(argCaptor.capture()); + TestNodeResponse response = argCaptor.getValue(); + assertNotNull(response); + } + private List mockList(Supplier supplier, int size) { List failures = new ArrayList<>(size); for (int i = 0; i < size; ++i) { @@ -290,6 +318,19 @@ public TestTransportNodesAction getTestTransportNodesAction() { ); } + public TestTransportNodesAction getListenableHandlerTestTransportNodesAction() { + return new TestTransportNodesAction( + THREAD_POOL, + clusterService, + transportService, + new ActionFilters(Collections.emptySet()), + TestNodesRequest::new, + TestNodeRequest::new, + ThreadPool.Names.SAME, + true + ); + } + public DataNodesOnlyTransportNodesAction getDataNodesOnlyTransportNodesAction(TransportService transportService) { return new DataNodesOnlyTransportNodesAction( THREAD_POOL, @@ -335,6 +376,31 @@ private static class TestTransportNodesAction extends TransportNodesAction< ); } + TestTransportNodesAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader nodeRequest, + String nodeExecutor, + boolean listenableHandler + ) { + super( + "indices:admin/test", + threadPool, + clusterService, + transportService, + actionFilters, + request, + nodeRequest, + nodeExecutor, + nodeExecutor, + listenableHandler, + TestNodeResponse.class + ); + } + @Override protected TestNodesResponse newResponse( TestNodesRequest request, @@ -359,6 +425,11 @@ protected TestNodeResponse nodeOperation(TestNodeRequest request) { return new TestNodeResponse(); } + @Override + protected void nodeOperation(TestNodeRequest request, ActionListener actionListener) { + actionListener.onResponse(new TestNodeResponse()); + } + } private static class DataNodesOnlyTransportNodesAction extends TestTransportNodesAction {