Skip to content

Commit

Permalink
dispatch ML task to ML node first
Browse files Browse the repository at this point in the history
Signed-off-by: Yaliang Wu <ylwu@amazon.com>
  • Loading branch information
ylwu-amzn committed Apr 13, 2022
1 parent 0e530e5 commit 6883925
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions plugin/src/main/java/org/opensearch/ml/task/MLTaskDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.opensearch.ml.stats.StatNames.ML_EXECUTING_TASK_COUNT;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -54,7 +55,7 @@ public MLTaskDispatcher(ClusterService clusterService, Client client) {
public void dispatchTask(ActionListener<DiscoveryNode> listener) {
// todo: add ML node type setting check
// DiscoveryNode[] mlNodes = getEligibleMLNodes();
DiscoveryNode[] mlNodes = getEligibleDataNodes();
DiscoveryNode[] mlNodes = getEligibleNodes();
MLStatsNodesRequest MLStatsNodesRequest = new MLStatsNodesRequest(mlNodes);
MLStatsNodesRequest.addAll(ImmutableSet.of(ML_EXECUTING_TASK_COUNT, JVM_HEAP_USAGE.getName()));

Expand Down Expand Up @@ -120,14 +121,26 @@ private DiscoveryNode[] getEligibleMLNodes() {
return eligibleNodes.toArray(new DiscoveryNode[0]);
}

private DiscoveryNode[] getEligibleDataNodes() {
private DiscoveryNode[] getEligibleNodes() {
ClusterState state = this.clusterService.state();
final List<DiscoveryNode> eligibleMLNodes = new ArrayList<>();
final List<DiscoveryNode> eligibleDataNodes = new ArrayList<>();
for (DiscoveryNode node : state.nodes()) {
if (MLNodeUtils.isMLNode(node)) {
eligibleMLNodes.add(node);
}
if (node.isDataNode()) {
eligibleDataNodes.add(node);
}
}
return eligibleDataNodes.toArray(new DiscoveryNode[0]);
if (eligibleMLNodes.size() > 0) {
DiscoveryNode[] mlNodes = eligibleMLNodes.toArray(new DiscoveryNode[0]);
log.info("We have {} dedicated ML nodes: {}", eligibleMLNodes.size(), Arrays.toString(mlNodes));
return mlNodes;
} else {
DiscoveryNode[] dataNodes = eligibleDataNodes.toArray(new DiscoveryNode[0]);
log.info("We have no dedicated ML nodes. But have {} data nodes: {}", eligibleDataNodes.size(), Arrays.toString(dataNodes));
return dataNodes;
}
}
}

0 comments on commit 6883925

Please sign in to comment.