Skip to content

Commit

Permalink
Add listenable TransportRequestHandler in TransportNodesAction
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Dec 13, 2024
1 parent b67cdf4 commit 3f1f5bd
Showing 1 changed file with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,50 @@ protected TransportNodesAction(
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}

/**
* @param actionName action name
* @param threadPool thread-pool
* @param clusterService cluster service
* @param transportService transport service
* @param actionFilters action filters
* @param request node request writer
* @param nodeRequest node request reader
* @param nodeExecutor executor to execute node action on
* @param finalExecutor executor to execute final collection of all responses on
* @param listenableHandler true if the handler should be a listenable handler
* @param nodeResponseClass class of the node responses
*/
protected TransportNodesAction(
String actionName,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request,
Writeable.Reader<NodeRequest> nodeRequest,
String nodeExecutor,
String finalExecutor,
boolean listenableHandler,
Class<NodeResponse> nodeResponseClass
) {
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
if (listenableHandler) {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new ListenableNodeTransportHandler());
} else {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}
}

/**
* Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader,
* Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final
* Writeable.Reader, String, String, boolean, Class)} but executes final response collection on the transport thread except for when the final
* node response is received from the local node, in which case {@code nodeExecutor} is used.
* This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed
* on a transport thread.
Expand All @@ -144,6 +185,7 @@ protected TransportNodesAction(
nodeRequest,
nodeExecutor,
ThreadPool.Names.SAME,
false,
nodeResponseClass
);
}
Expand Down Expand Up @@ -196,6 +238,8 @@ protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray<?

protected abstract NodeResponse nodeOperation(NodeRequest request);

protected void nodeOperation(NodeRequest request, ActionListener<NodeResponse> actionListener) {}

protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return nodeOperation(request);
}
Expand Down Expand Up @@ -335,4 +379,14 @@ public void messageReceived(NodeRequest request, TransportChannel channel, Task
}
}

class ListenableNodeTransportHandler implements TransportRequestHandler<NodeRequest> {

@Override
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) {
ActionListener<NodeResponse> listener = ActionListener.wrap(channel::sendResponse, e -> {
TransportChannel.sendErrorResponse(channel, actionName, request, e);
});
nodeOperation(request, listener);
}
}
}

0 comments on commit 3f1f5bd

Please sign in to comment.