diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index dccd5059dd52d..5e339c388439c 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -116,9 +116,50 @@ protected TransportNodesAction( transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler()); } + /** + * @param actionName action name + * @param threadPool thread-pool + * @param clusterService cluster service + * @param transportService transport service + * @param actionFilters action filters + * @param request node request writer + * @param nodeRequest node request reader + * @param nodeExecutor executor to execute node action on + * @param finalExecutor executor to execute final collection of all responses on + * @param listenableHandler true if the handler should be a listenable handler + * @param nodeResponseClass class of the node responses + */ + protected TransportNodesAction( + String actionName, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader nodeRequest, + String nodeExecutor, + String finalExecutor, + boolean listenableHandler, + Class nodeResponseClass + ) { + super(actionName, transportService, actionFilters, request); + this.threadPool = threadPool; + this.clusterService = Objects.requireNonNull(clusterService); + this.transportService = Objects.requireNonNull(transportService); + this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass); + + this.transportNodeAction = actionName + "[n]"; + this.finalExecutor = finalExecutor; + if (listenableHandler) { + transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new ListenableNodeTransportHandler()); + } else { + transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler()); + } + } + /** * Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader, - * Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final + * Writeable.Reader, String, String, boolean, Class)} but executes final response collection on the transport thread except for when the final * node response is received from the local node, in which case {@code nodeExecutor} is used. * This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed * on a transport thread. @@ -144,6 +185,7 @@ protected TransportNodesAction( nodeRequest, nodeExecutor, ThreadPool.Names.SAME, + false, nodeResponseClass ); } @@ -196,6 +238,8 @@ protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray actionListener) {} + protected NodeResponse nodeOperation(NodeRequest request, Task task) { return nodeOperation(request); } @@ -335,4 +379,14 @@ public void messageReceived(NodeRequest request, TransportChannel channel, Task } } + class ListenableNodeTransportHandler implements TransportRequestHandler { + + @Override + public void messageReceived(NodeRequest request, TransportChannel channel, Task task) { + ActionListener listener = ActionListener.wrap(channel::sendResponse, e -> { + TransportChannel.sendErrorResponse(channel, actionName, request, e); + }); + nodeOperation(request, listener); + } + } }