Skip to content

Commit

Permalink
Reset discovery nodes in most transport node actions request. (#15131)
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
Co-authored-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
SwethaGuptha and Swetha Guptha committed Sep 6, 2024
1 parent 3b8a741 commit 0d2698e
Show file tree
Hide file tree
Showing 24 changed files with 86 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public NodesHotThreadsRequest(StreamInput in) throws IOException {
* threads for all nodes is used.
*/
public NodesHotThreadsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public int threads() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public NodesInfoRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesInfoRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
all();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private final Set<String> requestedMetrics = new HashSet<>();

public NodesStatsRequest() {
super((String[]) null);
super(false, (String[]) null);
}

public NodesStatsRequest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -90,7 +90,7 @@ public NodesStatsRequest(StreamInput in) throws IOException {
* for all nodes will be returned.
*/
public NodesStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public NodesUsageRequest(StreamInput in) throws IOException {
* passed, usage for all nodes will be returned.
*/
public NodesUsageRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(String[] nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public Request snapshots(Snapshot[] snapshots) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ClusterStatsRequest(StreamInput in) throws IOException {
* based on all nodes will be returned.
*/
public ClusterStatsRequest(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}

public boolean useAggregatedNodeLevelResponses() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public FindDanglingIndexRequest(StreamInput in) throws IOException {
}

public FindDanglingIndexRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public ListDanglingIndicesRequest(StreamInput in) throws IOException {
}

public ListDanglingIndicesRequest() {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = null;
}

public ListDanglingIndicesRequest(String indexUUID) {
super(Strings.EMPTY_ARRAY);
super(false, Strings.EMPTY_ARRAY);
this.indexUUID = indexUUID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesReques

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
super(false, concreteNodes);
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;

private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand All @@ -88,11 +89,22 @@ protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, String... nodesIds) {
this.nodesIds = nodesIds;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

protected BaseNodesRequest(DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
}

protected BaseNodesRequest(boolean includeDiscoveryNodes, DiscoveryNode... concreteNodes) {
this.nodesIds = null;
this.concreteNodes = concreteNodes;
this.includeDiscoveryNodes = includeDiscoveryNodes;
}

public final String[] nodesIds() {
return nodesIds;
}
Expand Down Expand Up @@ -127,10 +139,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,16 @@ class AsyncAction {
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class,
// we remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
}

void start() {
final DiscoveryNode[] nodes = this.concreteNodes;
if (nodes.length == 0) {
if (this.concreteNodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
return;
Expand All @@ -260,9 +258,9 @@ void start() {
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
for (int i = 0; i < nodes.length; i++) {
for (int i = 0; i < this.concreteNodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];
final DiscoveryNode node = this.concreteNodes[i];
final String nodeId = node.getId();
try {
TransportRequest nodeRequest = newNodeRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(String... nodesIds) {
super(nodesIds);
super(false, nodesIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(DiscoveryNode[] nodes, Map<ShardId, ShardAttributes> shardAttributes) {
super(nodes);
super(false, nodes);
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Request(StreamInput in) throws IOException {
}

public Request(Map<ShardId, ShardAttributes> shardAttributes, DiscoveryNode[] nodes) {
super(nodes);
super(false, nodes);
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,12 @@

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand All @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Loading

0 comments on commit 0d2698e

Please sign in to comment.