Skip to content

Commit

Permalink
Add listenable TransportRequestHandler in TransportNodesAction
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Aug 8, 2024
1 parent 26ff635 commit 11c214f
Showing 1 changed file with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,50 @@ protected TransportNodesAction(
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}

/**
* @param actionName action name
* @param threadPool thread-pool
* @param clusterService cluster service
* @param transportService transport service
* @param actionFilters action filters
* @param request node request writer
* @param nodeRequest node request reader
* @param nodeExecutor executor to execute node action on
* @param finalExecutor executor to execute final collection of all responses on
* @param listenableHandler true if the handler should be a listenable handler
* @param nodeResponseClass class of the node responses
*/
protected TransportNodesAction(
String actionName,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request,
Writeable.Reader<NodeRequest> nodeRequest,
String nodeExecutor,
String finalExecutor,
boolean listenableHandler,
Class<NodeResponse> nodeResponseClass
) {
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
if (listenableHandler) {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new ListenableNodeTransportHandler());

Check warning on line 154 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L154

Added line #L154 was not covered by tests
} else {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}
}

/**
* Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader,
* Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final
* Writeable.Reader, String, String, boolean, Class)} but executes final response collection on the transport thread except for when the final
* node response is received from the local node, in which case {@code nodeExecutor} is used.
* This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed
* on a transport thread.
Expand All @@ -144,6 +185,7 @@ protected TransportNodesAction(
nodeRequest,
nodeExecutor,
ThreadPool.Names.SAME,
false,
nodeResponseClass
);
}
Expand Down Expand Up @@ -196,6 +238,8 @@ protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray<?

protected abstract NodeResponse nodeOperation(NodeRequest request);

protected void nodeOperation(NodeRequest request, ActionListener<NodeResponse> actionListener) {}

Check warning on line 241 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L241

Added line #L241 was not covered by tests

protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return nodeOperation(request);
}
Expand Down Expand Up @@ -337,4 +381,14 @@ public void messageReceived(NodeRequest request, TransportChannel channel, Task
}
}

class ListenableNodeTransportHandler implements TransportRequestHandler<NodeRequest> {

Check warning on line 384 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L384

Added line #L384 was not covered by tests

@Override
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) {
ActionListener<NodeResponse> listener = ActionListener.wrap(channel::sendResponse, e -> {
TransportChannel.sendErrorResponse(channel, actionName, request, e);
});
nodeOperation(request, listener);
}

Check warning on line 392 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L388-L392

Added lines #L388 - L392 were not covered by tests
}
}

0 comments on commit 11c214f

Please sign in to comment.