Skip to content

Commit

Permalink
Add unit tests for ShardStateAction's ShardStartedClusterStateTaskExe…
Browse files Browse the repository at this point in the history
…cutor (#37756)
  • Loading branch information
tlrx authored Jan 25, 2019
1 parent dfecb25 commit a644bc0
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

import java.time.Instant;
import java.util.Collections;
import java.util.Locale;

Expand Down Expand Up @@ -85,7 +88,16 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
"wait until initialization has completed";
}
assertEquals("{\"index\":\"idx\",\"shard\":0,\"primary\":true,\"current_state\":\"" +
shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\",\"current_node\":" +
shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\"" +
(shard.unassignedInfo() != null ?
",\"unassigned_info\":{"
+ "\"reason\":\"" + shard.unassignedInfo().getReason() + "\","
+ "\"at\":\""+ UnassignedInfo.DATE_TIME_FORMATTER.format(
Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis())) + "\","
+ "\"last_allocation_status\":\"" + AllocationDecision.fromAllocationStatus(
shard.unassignedInfo().getLastAllocationStatus()) + "\"}"
: "")
+ ",\"current_node\":" +
"{\"id\":\"" + cae.getCurrentNode().getId() + "\",\"name\":\"" + cae.getCurrentNode().getName() +
"\",\"transport_address\":\"" + cae.getCurrentNode().getAddress() +
"\"},\"explanation\":\"" + explanation + "\"}", Strings.toString(builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
} else if (primaryState == ShardRoutingState.INITIALIZING) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -73,7 +72,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
private ClusterState clusterState;
private ShardStateAction.ShardFailedClusterStateTaskExecutor executor;

@Before
@Override
public void setUp() throws Exception {
super.setUp();
allocationService = createAllocationService(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.action.shard;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.singletonList;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {

private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;

@Override
public void setUp() throws Exception {
super.setUp();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build());
executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);
}

public void testEmptyTaskListProducesSameClusterState() throws Exception {
final ClusterState clusterState = stateWithNoShard();
assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState));
}

public void testNonExistentIndexMarkedAsSuccessful() throws Exception {
final ClusterState clusterState = stateWithNoShard();
final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test");
assertTasksExecution(clusterState, singletonList(entry),
result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(entry), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true));
});
}

public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
final ClusterState clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2));

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = Stream.concat(
// Existent shard id but different allocation id
IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")),
// Non existent shard id
IntStream.range(1, randomIntBetween(2, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id"))

).collect(Collectors.toList());

assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
});
});
}

public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
final ClusterState clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards()))
.mapToObj(i -> {
final ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
final IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
final String allocationId;
if (randomBoolean()) {
allocationId = shardRoutingTable.primaryShard().allocationId().getId();
} else {
allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId();
}
return new StartedShardEntry(shardId, allocationId, "test");
}).collect(Collectors.toList());

assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
});
});
}

public void testStartedShards() throws Exception {
final String indexName = "test";
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String primaryAllocationId = primaryShard.allocationId().getId();

final List<StartedShardEntry> tasks = new ArrayList<>();
tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test"));
if (randomBoolean()) {
final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next();
final String replicaAllocationId = replicaShard.allocationId().getId();
tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test"));
}
assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));

final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
});
});
}

public void testDuplicateStartsAreOkay() throws Exception {
final String indexName = "test";
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String allocationId = shardRouting.allocationId().getId();

final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(2, 10))
.mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test"))
.collect(Collectors.toList());

assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));

final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
});
});
}

private void assertTasksExecution(final ClusterState state,
final List<StartedShardEntry> tasks,
final Consumer<ClusterStateTaskExecutor.ClusterTasksResult> consumer) throws Exception {
final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(state, tasks);
assertThat(result, notNullValue());
consumer.accept(result);
}
}
Loading

0 comments on commit a644bc0

Please sign in to comment.