Skip to content

Commit

Permalink
Merge pull request #15281 from ywelsch/feature/alloc-ids-primary
Browse files Browse the repository at this point in the history
Allocate primary shards based on allocation IDs
  • Loading branch information
Yannick Welsch committed Dec 17, 2015
2 parents b71845b + 3a442db commit 8f14b10
Show file tree
Hide file tree
Showing 26 changed files with 643 additions and 348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
public static class StoreStatus implements Streamable, ToXContent, Comparable<StoreStatus> {
private DiscoveryNode node;
private long version;
private String allocationId;
private Throwable storeException;
private Allocation allocation;
private AllocationStatus allocationStatus;

/**
* The status of the shard store with respect to the cluster
*/
public enum Allocation {
public enum AllocationStatus {

/**
* Allocated as primary
Expand All @@ -81,16 +82,16 @@ public enum Allocation {

private final byte id;

Allocation(byte id) {
AllocationStatus(byte id) {
this.id = id;
}

private static Allocation fromId(byte id) {
private static AllocationStatus fromId(byte id) {
switch (id) {
case 0: return PRIMARY;
case 1: return REPLICA;
case 2: return UNUSED;
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
}
}

Expand All @@ -99,11 +100,11 @@ public String value() {
case 0: return "primary";
case 1: return "replica";
case 2: return "unused";
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
}
}

private static Allocation readFrom(StreamInput in) throws IOException {
private static AllocationStatus readFrom(StreamInput in) throws IOException {
return fromId(in.readByte());
}

Expand All @@ -115,10 +116,11 @@ private void writeTo(StreamOutput out) throws IOException {
private StoreStatus() {
}

public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) {
public StoreStatus(DiscoveryNode node, long version, String allocationId, AllocationStatus allocationStatus, Throwable storeException) {
this.node = node;
this.version = version;
this.allocation = allocation;
this.allocationId = allocationId;
this.allocationStatus = allocationStatus;
this.storeException = storeException;
}

Expand All @@ -130,13 +132,20 @@ public DiscoveryNode getNode() {
}

/**
* Version of the store, used to select the store that will be
* used as a primary.
* Version of the store
*/
public long getVersion() {
return version;
}

/**
* AllocationStatus id of the store, used to select the store that will be
* used as a primary.
*/
public String getAllocationId() {
return allocationId;
}

/**
* Exception while trying to open the
* shard index or from when the shard failed
Expand All @@ -146,13 +155,13 @@ public Throwable getStoreException() {
}

/**
* The allocation status of the store.
* {@link Allocation#PRIMARY} indicates a primary shard copy
* {@link Allocation#REPLICA} indicates a replica shard copy
* {@link Allocation#UNUSED} indicates an unused shard copy
* The allocationStatus status of the store.
* {@link AllocationStatus#PRIMARY} indicates a primary shard copy
* {@link AllocationStatus#REPLICA} indicates a replica shard copy
* {@link AllocationStatus#UNUSED} indicates an unused shard copy
*/
public Allocation getAllocation() {
return allocation;
public AllocationStatus getAllocationStatus() {
return allocationStatus;
}

static StoreStatus readStoreStatus(StreamInput in) throws IOException {
Expand All @@ -165,7 +174,8 @@ static StoreStatus readStoreStatus(StreamInput in) throws IOException {
public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
version = in.readLong();
allocation = Allocation.readFrom(in);
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
storeException = in.readThrowable();
}
Expand All @@ -175,7 +185,8 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(version);
allocation.writeTo(out);
out.writeOptionalString(allocationId);
allocationStatus.writeTo(out);
if (storeException != null) {
out.writeBoolean(true);
out.writeThrowable(storeException);
Expand All @@ -188,7 +199,8 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
node.toXContent(builder, params);
builder.field(Fields.VERSION, version);
builder.field(Fields.ALLOCATED, allocation.value());
builder.field(Fields.ALLOCATION_ID, allocationId);
builder.field(Fields.ALLOCATED, allocationStatus.value());
if (storeException != null) {
builder.startObject(Fields.STORE_EXCEPTION);
ElasticsearchException.toXContent(builder, params, storeException);
Expand All @@ -206,7 +218,7 @@ public int compareTo(StoreStatus other) {
} else {
int compare = Long.compare(other.version, version);
if (compare == 0) {
return Integer.compare(allocation.id, other.allocation.id);
return Integer.compare(allocationStatus.id, other.allocationStatus.id);
}
return compare;
}
Expand Down Expand Up @@ -379,6 +391,7 @@ static final class Fields {
static final XContentBuilderString STORES = new XContentBuilderString("stores");
// StoreStatus fields
static final XContentBuilderString VERSION = new XContentBuilderString("version");
static final XContentBuilderString ALLOCATION_ID = new XContentBuilderString("allocation_id");
static final XContentBuilderString STORE_EXCEPTION = new XContentBuilderString("store_exception");
static final XContentBuilderString ALLOCATED = new XContentBuilderString("allocation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ void finish() {
}
for (NodeGatewayStartedShards response : fetchResponse.responses) {
if (shardExistsInNode(response)) {
IndicesShardStoresResponse.StoreStatus.Allocation allocation = getAllocation(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), allocation, response.storeException()));
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), response.allocationId(), allocationStatus, response.storeException()));
}
}
CollectionUtil.timSort(storeStatuses);
Expand All @@ -193,27 +193,27 @@ void finish() {
listener.onResponse(new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder)));
}

private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) {
private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
for (ShardRouting shardRouting : routingNodes.node(node.id())) {
ShardId shardId = shardRouting.shardId();
if (shardId.id() == shardID && shardId.getIndex().equals(index)) {
if (shardRouting.primary()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY;
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
} else if (shardRouting.assignedToNode()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA;
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
} else {
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
}
}
}
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
}

/**
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
return response.storeException() != null || response.version() != -1;
return response.storeException() != null || response.version() != -1 || response.allocationId() != null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public Builder numberOfReplicas(int numberOfReplicas) {
public int numberOfReplicas() {
return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1);
}

public Builder creationDate(long creationDate) {
settings = settingsBuilder().put(settings).put(SETTING_CREATION_DATE, creationDate).build();
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

/**
* Service responsible for submitting open/close index requests
Expand Down Expand Up @@ -92,14 +91,6 @@ public ClusterState execute(ClusterState currentState) {
}

if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
for (IndexShardRoutingTable shard : indexRoutingTable) {
for (ShardRouting shardRouting : shard) {
if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
}
}
}
indicesToClose.add(index);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -267,7 +269,7 @@ public ShardId shardId() {
return shardIdentifier;
}

public boolean allocatedPostIndexCreate() {
public boolean allocatedPostIndexCreate(IndexMetaData indexMetaData) {
if (active()) {
return true;
}
Expand All @@ -279,6 +281,11 @@ public boolean allocatedPostIndexCreate() {
return false;
}

if (indexMetaData.activeAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_3_0_0)) {
// when no shards with this id have ever been active for this index
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -360,7 +360,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

// a flag for whether the primary shard has been previously allocated
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate();
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData);

// checks for exact byte comparisons
if (freeBytes < freeBytesThresholdLow.bytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -82,8 +83,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
}

Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).getSettings();
String enableIndexValue = indexSettings.get(INDEX_ROUTING_ALLOCATION_ENABLE);
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
String enableIndexValue = indexMetaData.getSettings().get(INDEX_ROUTING_ALLOCATION_ENABLE);
final Allocation enable;
if (enableIndexValue != null) {
enable = Allocation.parse(enableIndexValue);
Expand All @@ -96,7 +97,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
case NONE:
return allocation.decision(Decision.NO, NAME, "no allocations are allowed");
case NEW_PRIMARIES:
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
} else {
return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");
Expand Down
Loading

0 comments on commit 8f14b10

Please sign in to comment.