Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.x][backport] Make allocation decisions at node level first for pending task optimi… #739

Merged
merged 1 commit into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@

package org.opensearch.benchmark.routing.allocation;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -52,8 +42,20 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Fork(3)
Expand All @@ -71,75 +73,103 @@ public class AllocationBenchmark {
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR.
@Param({
// indices| shards| replicas| nodes
" 10| 1| 0| 1",
" 10| 3| 0| 1",
" 10| 10| 0| 1",
" 100| 1| 0| 1",
" 100| 3| 0| 1",
" 100| 10| 0| 1",

" 10| 1| 0| 10",
" 10| 3| 0| 10",
" 10| 10| 0| 10",
" 100| 1| 0| 10",
" 100| 3| 0| 10",
" 100| 10| 0| 10",

" 10| 1| 1| 10",
" 10| 3| 1| 10",
" 10| 10| 1| 10",
" 100| 1| 1| 10",
" 100| 3| 1| 10",
" 100| 10| 1| 10",

" 10| 1| 2| 10",
" 10| 3| 2| 10",
" 10| 10| 2| 10",
" 100| 1| 2| 10",
" 100| 3| 2| 10",
" 100| 10| 2| 10",

" 10| 1| 0| 50",
" 10| 3| 0| 50",
" 10| 10| 0| 50",
" 100| 1| 0| 50",
" 100| 3| 0| 50",
" 100| 10| 0| 50",

" 10| 1| 1| 50",
" 10| 3| 1| 50",
" 10| 10| 1| 50",
" 100| 1| 1| 50",
" 100| 3| 1| 50",
" 100| 10| 1| 50",

" 10| 1| 2| 50",
" 10| 3| 2| 50",
" 10| 10| 2| 50",
" 100| 1| 2| 50",
" 100| 3| 2| 50",
" 100| 10| 2| 50" })
public String indicesShardsReplicasNodes = "10|1|0|1";
// indices| shards| replicas| source| target| concurrentRecoveries
" 10| 2| 0| 1| 1| 1|",
" 10| 3| 0| 1| 1| 2|",
" 10| 10| 0| 1| 1| 5|",
" 100| 1| 0| 1| 1| 10|",
" 100| 3| 0| 1| 1| 10|",
" 100| 10| 0| 1| 1| 10|",

" 10| 2| 0| 10| 10| 1|",
" 10| 3| 0| 10| 5| 2|",
" 10| 10| 0| 10| 5| 5|",
" 100| 1| 0| 5| 10| 5|",
" 100| 3| 0| 10| 5| 5|",
" 100| 10| 0| 10| 20| 6|",

" 10| 1| 1| 10| 10| 1|",
" 10| 3| 1| 10| 3| 3|",
" 10| 10| 1| 5| 12| 5|",
" 100| 1| 1| 10| 10| 6|",
" 100| 3| 1| 10| 5| 8|",
" 100| 10| 1| 8| 17| 8|",

" 10| 1| 2| 10| 10| 1|",
" 10| 3| 2| 10| 5| 3|",
" 10| 10| 2| 5| 10| 5|",
" 100| 1| 2| 10| 8| 7|",
" 100| 3| 2| 13| 17| 5|",
" 100| 10| 2| 10| 20| 8|",

" 10| 2| 1| 20| 20| 1|",
" 10| 3| 1| 20| 30| 1|",
" 10| 10| 1| 20| 10| 3|",
" 100| 1| 1| 20| 5| 5|",
" 100| 3| 1| 20| 23| 6|",
" 100| 10| 1| 40| 20| 8|",

" 10| 3| 2| 50| 30| 1|",
" 10| 3| 2| 50| 25| 1|",
" 10| 10| 1| 50| 33| 2|",
" 100| 1| 1| 40| 50| 2|",
" 100| 3| 1| 50| 70| 3|",
" 100| 10| 1| 60| 50| 3|",

" 10| 10| 2| 50| 50| 1|",
" 10| 3| 2| 50| 30| 1|",
" 10| 10| 2| 50| 40| 2|",
" 100| 1| 2| 40| 50| 2|",
" 100| 3| 2| 50| 30| 6|",
" 100| 10| 2| 33| 55| 6|",

" 500| 60| 1| 100| 100| 12|",
" 500| 60| 1| 100| 40| 12|",
" 500| 60| 1| 40| 100| 12|",

" 50| 60| 1| 100| 100| 6|",
" 50| 60| 1| 100| 40| 6|",
" 50| 60| 1| 40| 100| 6|" })
public String indicesShardsReplicasSourceTargetRecoveries = "10|1|0|1|1|1";

public int numTags = 2;
public int numZone = 3;
public int concurrentRecoveries;
public int numIndices;
public int numShards;
public int numReplicas;
public int sourceNodes;
public int targetNodes;
public int clusterConcurrentRecoveries;

private AllocationService strategy;
private AllocationService initialClusterStrategy;
private AllocationService clusterExcludeStrategy;
private AllocationService clusterZoneAwareExcludeStrategy;
private ClusterState initialClusterState;

@Setup
public void setUp() throws Exception {
final String[] params = indicesShardsReplicasNodes.split("\\|");
final String[] params = indicesShardsReplicasSourceTargetRecoveries.split("\\|");
numIndices = toInt(params[0]);
numShards = toInt(params[1]);
numReplicas = toInt(params[2]);
sourceNodes = toInt(params[3]);
targetNodes = toInt(params[4]);
concurrentRecoveries = toInt(params[5]);

int numIndices = toInt(params[0]);
int numShards = toInt(params[1]);
int numReplicas = toInt(params[2]);
int numNodes = toInt(params[3]);
int totalShardCount = (numReplicas + 1) * numShards * numIndices;

strategy = Allocators.createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "tag").build()
initialClusterStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", "20")
.put("cluster.routing.allocation.exclude.tag", "tag_0")
.build()
);

// We'll try to move nodes from tag_1 to tag_0
clusterConcurrentRecoveries = Math.min(sourceNodes, targetNodes) * concurrentRecoveries;

Metadata.Builder mb = Metadata.builder();
for (int i = 1; i <= numIndices; i++) {
mb.put(
Expand All @@ -155,31 +185,96 @@ public void setUp() throws Exception {
rb.addAsNew(metadata.index("test_" + i));
}
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(nb)
.nodes(setUpClusterNodes(sourceNodes, targetNodes))
.build();
// Start all unassigned shards
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
while (initialClusterState.getRoutingNodes().hasUnassignedShards()) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
}
// Ensure all shards are started
while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
}

assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0);
// make sure shards are only allocated on tag1
for (ShardRouting startedShard : initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (initialClusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals(
"tag_1"
);
}
}

private int toInt(String v) {
return Integer.valueOf(v.trim());
}

@Benchmark
public ClusterState measureAllocation() {
public ClusterState measureExclusionOnZoneAwareStartedShard() throws Exception {
ClusterState clusterState = initialClusterState;
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
clusterState = strategy.applyStartedShards(
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
return clusterState;
}

@Benchmark
public ClusterState measureShardRelocationComplete() throws Exception {
ClusterState clusterState = initialClusterState;
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = clusterZoneAwareExcludeStrategy.applyStartedShards(
clusterState,
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
clusterState = strategy.reroute(clusterState, "reroute");
}
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals("tag_0");
}
return clusterState;
}

private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= sourceNodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 1);
attributes.put("zone", "zone_" + (i % numZone));
nb.add(Allocators.newNode("node_s_" + i, attributes));
}
for (int j = 1; j <= targetNodes; j++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 0);
attributes.put("zone", "zone_" + (j % numZone));
nb.add(Allocators.newNode("node_t_" + j, attributes));
}
return nb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand Down Expand Up @@ -244,6 +245,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
Expand Down
Loading