Skip to content

Commit

Permalink
apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Jan 25, 2019
1 parent 7223d31 commit 1093049
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
if (primaryState == ShardRoutingState.INITIALIZING) {
} else if (primaryState == ShardRoutingState.INITIALIZING) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
private ClusterState clusterState;
private ShardStateAction.ShardFailedClusterStateTaskExecutor executor;

@Before
@Override
public void setUp() throws Exception {
super.setUp();
allocationService = createAllocationService(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -52,26 +51,26 @@

public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {

private ClusterState clusterState;
private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;

@Before
@Override
public void setUp() throws Exception {
super.setUp();
clusterState = stateWithNoShard();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build());
executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);
}

public void testEmptyTaskListProducesSameClusterState() throws Exception {
assertTasksExecution(Collections.emptyList(), result -> assertSame(clusterState, result.resultingState));
final ClusterState clusterState = stateWithNoShard();
assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState));
}

public void testNonExistentIndexMarkedAsSuccessful() throws Exception {
final ClusterState clusterState = stateWithNoShard();
final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test");
assertTasksExecution(singletonList(entry),
assertTasksExecution(clusterState, singletonList(entry),
result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
Expand All @@ -82,7 +81,7 @@ public void testNonExistentIndexMarkedAsSuccessful() throws Exception {

public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2));
final ClusterState clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2));

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = Stream.concat(
Expand All @@ -95,7 +94,7 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {

).collect(Collectors.toList());

assertTasksExecution(tasks, result -> {
assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
Expand All @@ -107,7 +106,7 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {

public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1);
final ClusterState clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards()))
Expand All @@ -123,7 +122,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
return new StartedShardEntry(shardId, allocationId, "test");
}).collect(Collectors.toList());

assertTasksExecution(tasks, result -> {
assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
Expand All @@ -135,7 +134,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {

public void testStartedShards() throws Exception {
final String indexName = "test";
clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING);
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
Expand All @@ -149,7 +148,7 @@ public void testStartedShards() throws Exception {
final String replicaAllocationId = replicaShard.allocationId().getId();
tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test"));
}
assertTasksExecution(tasks, result -> {
assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
Expand All @@ -164,7 +163,7 @@ public void testStartedShards() throws Exception {

public void testDuplicateStartsAreOkay() throws Exception {
final String indexName = "test";
clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING);
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING);

final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
Expand All @@ -175,7 +174,7 @@ public void testDuplicateStartsAreOkay() throws Exception {
.mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test"))
.collect(Collectors.toList());

assertTasksExecution(tasks, result -> {
assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
Expand All @@ -188,9 +187,10 @@ public void testDuplicateStartsAreOkay() throws Exception {
});
}

private void assertTasksExecution(final List<StartedShardEntry> tasks,
private void assertTasksExecution(final ClusterState state,
final List<StartedShardEntry> tasks,
final Consumer<ClusterStateTaskExecutor.ClusterTasksResult> consumer) throws Exception {
final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(clusterState, tasks);
final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(state, tasks);
assertThat(result, notNullValue());
consumer.accept(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testSuccess() throws InterruptedException {
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);

listener.await();
assertTrue(listener.isSuccessful());
assertNull(listener.failure.get());
}

public void testNoMaster() throws InterruptedException {
Expand Down Expand Up @@ -284,7 +284,7 @@ public void testUnhandledFailure() {
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated"));
assertFalse(listener.isSuccessful());
assertNotNull(listener.failure.get());
}

public void testShardNotFound() throws InterruptedException {
Expand All @@ -302,7 +302,7 @@ public void testShardNotFound() throws InterruptedException {
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);

listener.await();
assertTrue(listener.isSuccessful());
assertNull(listener.failure.get());
}

public void testNoLongerPrimaryShardException() throws InterruptedException {
Expand All @@ -324,9 +324,11 @@ public void testNoLongerPrimaryShardException() throws InterruptedException {
transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError);

listener.await();
assertNotNull(listener.getFailure());
assertThat(listener.getFailure(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertThat(listener.getFailure().getMessage(), equalTo(catastrophicError.getMessage()));

final Exception failure = listener.failure.get();
assertNotNull(failure);
assertThat(failure, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertThat(failure.getMessage(), equalTo(catastrophicError.getMessage()));
}

public void testCacheRemoteShardFailed() throws Exception {
Expand Down Expand Up @@ -422,13 +424,15 @@ public void testShardStarted() throws InterruptedException {
shardStateAction.shardStarted(shardRouting, "testShardStarted", listener);

final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
listener.await();

assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class));

ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request;
assertThat(entry.shardId, equalTo(shardRouting.shardId()));
assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId()));

transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
listener.await();
assertNull(listener.failure.get());
}

private ShardRouting getRandomShardRouting(String index) {
Expand Down Expand Up @@ -584,14 +588,6 @@ public void onFailure(final Exception e) {
}
}

boolean isSuccessful() {
return getFailure() == null;
}

Exception getFailure() {
return failure.get();
}

void await() throws InterruptedException {
latch.await();
}
Expand Down

0 comments on commit 1093049

Please sign in to comment.