-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable acked indexing #17038
Enable acked indexing #17038
Changes from all commits
74194b8
57501ce
63ada98
25fae03
55465dd
51f2c3c
746ca07
4e39359
673a73d
285c3bf
7bb85e4
3353790
76465ec
4a7524f
2bd09c4
563304d
bd9e908
4793630
14ba0c3
37d739a
97be383
4e1f62e
2a93889
5576526
4de57fc
85d3d51
0e5b22a
c4324f9
649bcdc
27448dc
1f12bee
8b970d9
c7c8b1d
e201f5c
cffc315
7cdd647
3abf817
c2ed5a1
66cc202
95feb40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,7 @@ | |
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
@@ -679,7 +680,7 @@ protected void doRun() throws Exception { | |
return; | ||
} | ||
// closed in finishAsFailed(e) in the case of error | ||
indexShardReference = getIndexShardReferenceOnPrimary(shardId); | ||
indexShardReference = getIndexShardReferenceOnPrimary(shardId, request); | ||
if (indexShardReference.isRelocated() == false) { | ||
executeLocally(); | ||
} else { | ||
|
@@ -797,7 +798,7 @@ void finishBecauseUnavailable(ShardId shardId, String message) { | |
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally | ||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}). | ||
*/ | ||
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { | ||
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { | ||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); | ||
IndexShard indexShard = indexService.getShard(shardId.id()); | ||
// we may end up here if the cluster state used to route the primary is so stale that the underlying | ||
|
@@ -816,7 +817,8 @@ protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { | |
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) { | ||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); | ||
IndexShard indexShard = indexService.getShard(shardId.id()); | ||
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); | ||
IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); | ||
return ref; | ||
} | ||
|
||
/** | ||
|
@@ -997,30 +999,38 @@ public void handleException(TransportException exp) { | |
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); | ||
logger.warn("[{}] {}", exp, shardId, message); | ||
shardStateAction.shardFailed( | ||
shard, | ||
indexShardReference.routingEntry(), | ||
message, | ||
exp, | ||
new ShardStateAction.Listener() { | ||
@Override | ||
public void onSuccess() { | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable shardFailedError) { | ||
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { | ||
ShardRouting primaryShard = indexShardReference.routingEntry(); | ||
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); | ||
// we are no longer the primary, fail ourselves and start over | ||
indexShardReference.failShard(message, shardFailedError); | ||
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); | ||
} else { | ||
assert false : shardFailedError; | ||
shard, | ||
indexShardReference.routingEntry(), | ||
message, | ||
exp, | ||
new ShardStateAction.Listener() { | ||
@Override | ||
public void onSuccess() { | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable shardFailedError) { | ||
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { | ||
String message = "unknown"; | ||
try { | ||
ShardRouting primaryShard = indexShardReference.routingEntry(); | ||
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); | ||
// we are no longer the primary, fail ourselves and start over | ||
indexShardReference.failShard(message, shardFailedError); | ||
} catch (Throwable t) { | ||
shardFailedError.addSuppressed(t); | ||
} | ||
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); | ||
} else { | ||
// these can occur if the node is shutting down and are okay | ||
// any other exception here is not expected and merits investigation | ||
assert shardFailedError instanceof TransportException || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment about where these exceptions can come from? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pushed 4e1f62e. |
||
shardFailedError instanceof NodeClosedException : shardFailedError; | ||
onReplicaFailure(nodeId, exp); | ||
} | ||
} | ||
} | ||
} | ||
); | ||
} | ||
} | ||
|
@@ -1108,7 +1118,9 @@ protected boolean shouldExecuteReplication(Settings settings) { | |
|
||
interface IndexShardReference extends Releasable { | ||
boolean isRelocated(); | ||
|
||
void failShard(String reason, @Nullable Throwable e); | ||
|
||
ShardRouting routingEntry(); | ||
|
||
/** returns the primary term of the current operation */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,6 @@ | |
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; | ||
import org.elasticsearch.discovery.Discovery; | ||
import org.elasticsearch.discovery.DiscoverySettings; | ||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; | ||
import org.elasticsearch.discovery.zen.ZenDiscovery; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.BytesTransportRequest; | ||
|
@@ -58,11 +57,13 @@ | |
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* | ||
|
@@ -81,17 +82,22 @@ public interface NewPendingClusterStateListener { | |
} | ||
|
||
private final TransportService transportService; | ||
private final DiscoveryNodesProvider nodesProvider; | ||
private final Supplier<ClusterState> clusterStateSupplier; | ||
private final NewPendingClusterStateListener newPendingClusterStatelistener; | ||
private final DiscoverySettings discoverySettings; | ||
private final ClusterName clusterName; | ||
private final PendingClusterStatesQueue pendingStatesQueue; | ||
|
||
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, | ||
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { | ||
public PublishClusterStateAction( | ||
Settings settings, | ||
TransportService transportService, | ||
Supplier<ClusterState> clusterStateSupplier, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that node provider interface can go away (now that's were on java8 - function refs FTW) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought the same but can we keep it separate from this pull request? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for SURE! |
||
NewPendingClusterStateListener listener, | ||
DiscoverySettings discoverySettings, | ||
ClusterName clusterName) { | ||
super(settings); | ||
this.transportService = transportService; | ||
this.nodesProvider = nodesProvider; | ||
this.clusterStateSupplier = clusterStateSupplier; | ||
this.newPendingClusterStatelistener = listener; | ||
this.discoverySettings = discoverySettings; | ||
this.clusterName = clusterName; | ||
|
@@ -363,7 +369,7 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request, | |
final ClusterState incomingState; | ||
// If true we received full cluster state - otherwise diffs | ||
if (in.readBoolean()) { | ||
incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode()); | ||
incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); | ||
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); | ||
} else if (lastSeenClusterState != null) { | ||
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in); | ||
|
@@ -394,14 +400,25 @@ void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClus | |
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName); | ||
throw new IllegalStateException("received state from a node that is not part of the cluster"); | ||
} | ||
final DiscoveryNodes currentNodes = nodesProvider.nodes(); | ||
final ClusterState clusterState = clusterStateSupplier.get(); | ||
|
||
if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { | ||
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) { | ||
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode()); | ||
throw new IllegalStateException("received state from a node that is not part of the cluster"); | ||
throw new IllegalStateException("received state with a local node that does not match the current local node"); | ||
} | ||
|
||
if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) { | ||
String message = String.format( | ||
Locale.ROOT, | ||
"rejecting cluster state version [%d] uuid [%s] received from [%s]", | ||
incomingState.version(), | ||
incomingState.stateUUID(), | ||
incomingState.nodes().getMasterNodeId() | ||
); | ||
logger.warn(message); | ||
throw new IllegalStateException(message); | ||
} | ||
|
||
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); | ||
} | ||
|
||
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { | ||
|
@@ -518,7 +535,7 @@ public void waitForCommit(TimeValue commitTimeout) { | |
} | ||
|
||
if (timedout) { | ||
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]"); | ||
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])"); | ||
} | ||
if (isCommitted() == false) { | ||
throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we open a follow up issue that failShard should throw an already closed exception? (this is really what the code protects against). To be clear - I think the code can stay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #17366.