-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Primary relocation handoff #15900
Primary relocation handoff #15900
Conversation
Thanks @ywelsch . Can we open a PR with just the operation counter, extracted to it's own java file? We can work on the primary relocation after wards. Also I would love it if we can keep the counter naming to the universe out of IndexShard (for now). Let's try to keep the change small. |
d5a98ce
to
9fa9742
Compare
@@ -995,18 +995,17 @@ protected boolean shouldExecuteReplication(Settings settings) { | |||
|
|||
static class IndexShardReference implements Releasable { | |||
|
|||
final private IndexShard counter; | |||
private final Releasable operationLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep the shard reference? I know it's not needed here - but it serves as a proxy to the shard allowing to do more things - see https://github.com/elastic/elasticsearch/pull/15485/files#diff-a8aefbf42f29dc0fcc7c0a144863948eR1104
3bd3e8f
to
22d57eb
Compare
@bleskes I've updated the PR according to our discussion and considered the following 4 scenarios:
|
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); | ||
indexShardReference = getIndexShardOperationsCounter(shardId, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get we call the method getIndexShardReference? Also I like explicit naming here better then a primary boolean. See seq_no branch for an example
@@ -995,22 +1028,43 @@ protected boolean shouldExecuteReplication(Settings settings) { | |||
return IndexMetaData.isIndexUsingShadowReplicas(settings) == false; | |||
} | |||
|
|||
static class IndexShardReference implements Releasable { | |||
interface IndexShardReference extends Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we only have on impl of this can we make it a concrete class and drop the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface helps us to mock the implementation in the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait how can you not override a concrete class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically it is possible, but not nice to do in this particular case. The implementation class acquires the lock in its constructor (yes, one can argue that this can be done in a separate method, but then the lock cannot be a final variable in the class anymore). Also the constructor takes an IndexShard, which is irrelevant for the class that we mock. I still prefer having a small interface as the current implementation class does not share any code / behavior with the mocked version in the tests.
@ywelsch this looks awesome. I left some comments |
badcd06
to
8e98d32
Compare
IndexShardReference(IndexShard counter) { | ||
counter.incrementOperationCounter(); | ||
this.counter = counter; | ||
IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious isn't the boolean implicit from the IndexShard? I mean it's shard routing should say primary if the boolean is true no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but the converse does not hold. If we replicate an index request to a primary (e.g. a relocation target that is still recovering), then we take the replica operation lock. Currently, taking the primary or replica operation lock is pretty much the same thing. In the seq_no
branch, this is not the case anymore. @bleskes suggested separating the methods in this PR already as this will make it easier to merge subsequent changes into the seq_no
branch.
I did another round with minor comments LGTM otherwise |
8e98d32
to
6960158
Compare
Adds a container that represents a resource with reference counting capabilities. Provides operations to suspend acquisition of new references. Useful for resource management when resources are intermittently unavailable. Closes elastic#15956
When primary relocation completes, a cluster state is propagated that deactivates the old primary and marks the new primary as active. As cluster state changes are not applied synchronously on all nodes, there can be a time interval where the relocation target has processed the cluster state and believes to be the active primary and the relocation source has not yet processed the cluster state update and still believes itself to be the active primary. This commit ensures that, before completing the relocation, the reloction source deactivates writing to its store and delegates requests to the relocation target. Closes elastic#15900
6960158
to
10b5ffc
Compare
Reverts back 7d3da91 after fix in elastic#15900 Closes elastic#8706
There is a bug (document loss) with this which should be fixed by #15900 but it will not be backported so we should not test this.
There is a bug (document loss) with this which should be fixed by #15900 but it will not be backported so we should not test this.
There is a bug (document loss) with this which should be fixed by #15900 but it will not be backported so we should not test this.
There is a bug (document loss) with this which should be fixed by #15900 but it will not be backported so we should not test this.
When primary relocation completes, a cluster state is propagated that deactivates the old primary and marks the new primary as active. As cluster state changes are not applied synchronously on all nodes, there can be a time interval where the relocation target has processed the cluster state and believes to be the active primary and the relocation source has not yet processed the cluster state update and still believes itself to be the active primary. This PR ensures that, before completing the relocation, the relocation source deactivates writing to its store and delegates requests to the relocation target.
The change is motivated as follows:
We need to ensure that we only start writing data into the new primary once all the writes into the old primary have been completely replicated (among others to the new primary). This ensures that the new primary operates on the proper document version numbers. Document versions are increased when writing to the primary and then used on the replica to make sure that newer documents are not overridden by older documents (in the presence of concurrent replication). A scenario for this would be: Write document with id "K" and value "X" to old primary (gets version 1) and replicate it to new primary as well as replica. Assume that another document with id "K" but value "Y" is written on the new primary before the new primary gets the replicated write of "K" with value "X". Unaware of the other write it will then assign the same version number (namely 1) to the document with value "Y" and replicate it to the replica. Depending on the order in which replicated writes from old and new primary arrive at the replica, it will then either store "X" or "Y", which means that the new primary and the replica can become out of sync.
We have to ensure that no new writes are done on the old primary once we start writing into the new primary. This helps with the following scenario. Assume primary relocation completes and master broadcasts cluster state which now only contains the new primary. Due to the distributed nature of Elasticsearch, cluster states are not applied in full synchrony on all nodes. For a brief moment nodes in the cluster have a different view of which node is the primary. In particular, it's possible that the node holding the old primary (node A) still believes to be the primary whereas the node holding the new primary (node B) believes to be the primary as well. If we send a document to node B, it will get indexed into the new primary and acknowledged (but won't exist on the old primary). If we then issue a delete request for the same document to the node A (which can happen if we send requests round-robin to nodes), then that node will not find the document in its old primary and fail the request.
This PR (in combination with #19013) implements the following solution:
Before completing the relocation, node A (holding the primary relocation source) deactivates writing to its shard copy (and temporarily puts all new incoming requests for that shard into a queue), then waits for all ongoing operations to be fully replicated. Once that is done, it delegates all new incoming requests to node B (holding the new primary) and also sends all the elements in the queue there. It uses a special action to delegate requests to node B, which bypasses the standard reroute phase when accepting requests as standard rerouting is based on the current cluster state on the node. At that moment, indexing requests that directly go to the node B will still be rerouted back to node A with the old primary. This means that node A is still in charge of indexing, but will use the physical shard copy on node B to do so. Node B finally asks the master to activate the new primary (finish the relocation). The master then broadcasts a new cluster state where the old primary on node A is removed and the new primary on node B is active. It doesn't matter now in which order the cluster state is applied on the nodes A and B:
supersedes #15532