Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race between replica reset and primary promotion #32442

Merged
merged 19 commits into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
switch (indexResult.getResultType()) {
case SUCCESS:
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
Expand All @@ -161,7 +161,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
switch (deleteResult.getResultType()) {
case SUCCESS:
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
Expand Down Expand Up @@ -300,7 +300,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
assert result instanceof Engine.IndexResult : result.getClass();
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
indexResponse.getResult());
Expand All @@ -320,7 +320,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
final DeleteRequest updateDeleteRequest = translate.action();

final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());

updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
Expand Down Expand Up @@ -356,7 +356,7 @@ static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
return new BulkItemResultHolder(null, result, primaryItemRequest);
}

Expand Down Expand Up @@ -559,15 +559,15 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> new Engine.IndexResult(e, request.version()),
e -> primary.getFailedIndexResult(e, request.version()),
mappingUpdater);
}

private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(),
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
e -> new Engine.DeleteResult(e, request.version()),
e -> primary.getFailedDeleteResult(e, request.version()),
mappingUpdater);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPrimaryTerm();
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
Expand Down Expand Up @@ -983,7 +983,7 @@ class PrimaryShardReference extends ShardReference
}

public boolean isRelocated() {
return indexShard.isPrimaryMode() == false;
return indexShard.isRelocatedPrimary();
}

@Override
Expand Down
44 changes: 26 additions & 18 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,26 +304,29 @@ public abstract static class Result {
private final Operation.TYPE operationType;
private final Result.Type resultType;
private final long version;
private final long term;
private final long seqNo;
private final Exception failure;
private final SetOnce<Boolean> freeze = new SetOnce<>();
private final Mapping requiredMappingUpdate;
private Translog.Location translogLocation;
private long took;

protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
this.operationType = operationType;
this.failure = Objects.requireNonNull(failure);
this.version = version;
this.term = term;
this.seqNo = seqNo;
this.requiredMappingUpdate = null;
this.resultType = Type.FAILURE;
}

protected Result(Operation.TYPE operationType, long version, long seqNo) {
protected Result(Operation.TYPE operationType, long version, long term, long seqNo) {
this.operationType = operationType;
this.version = version;
this.seqNo = seqNo;
this.term = term;
this.failure = null;
this.requiredMappingUpdate = null;
this.resultType = Type.SUCCESS;
Expand All @@ -333,6 +336,7 @@ protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
this.operationType = operationType;
this.version = Versions.NOT_FOUND;
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
this.term = 0L;
this.failure = null;
this.requiredMappingUpdate = requiredMappingUpdate;
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
Expand All @@ -357,6 +361,10 @@ public long getSeqNo() {
return seqNo;
}

public long getTerm() {
return term;
}

/**
* If the operation was aborted due to missing mappings, this method will return the mappings
* that are required to complete the operation.
Expand Down Expand Up @@ -415,20 +423,20 @@ public static class IndexResult extends Result {

private final boolean created;

public IndexResult(long version, long seqNo, boolean created) {
super(Operation.TYPE.INDEX, version, seqNo);
public IndexResult(long version, long term, long seqNo, boolean created) {
super(Operation.TYPE.INDEX, version, term, seqNo);
this.created = created;
}

/**
* use in case of the index operation failed before getting to internal engine
**/
public IndexResult(Exception failure, long version) {
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
public IndexResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
}

public IndexResult(Exception failure, long version, long seqNo) {
super(Operation.TYPE.INDEX, failure, version, seqNo);
public IndexResult(Exception failure, long version, long term, long seqNo) {
super(Operation.TYPE.INDEX, failure, version, term, seqNo);
this.created = false;
}

Expand All @@ -447,20 +455,20 @@ public static class DeleteResult extends Result {

private final boolean found;

public DeleteResult(long version, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, version, seqNo);
public DeleteResult(long version, long term, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, version, term, seqNo);
this.found = found;
}

/**
* use in case of the delete operation failed before getting to internal engine
**/
public DeleteResult(Exception failure, long version) {
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
public DeleteResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
}

public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, failure, version, seqNo);
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
super(Operation.TYPE.DELETE, failure, version, term, seqNo);
this.found = found;
}

Expand All @@ -477,12 +485,12 @@ public boolean isFound() {

public static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
NoOpResult(long term, long seqNo) {
super(Operation.TYPE.NO_OP, term, 0, seqNo);
}

NoOpResult(long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
NoOpResult(long term, long seqNo, Exception failure) {
super(Operation.TYPE.NO_OP, failure, term, 0, seqNo);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ protected long doGenerateSeqNoForOperation(final Operation operation) {
return localCheckpointTracker.generateSeqNo();
}

private long getPrimaryTerm() {
return engineConfig.getPrimaryTermSupplier().getAsLong();
}

@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
Expand Down Expand Up @@ -788,7 +792,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
Expand Down Expand Up @@ -900,7 +904,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
generateSeqNoForOperation(index),
Expand Down Expand Up @@ -930,7 +934,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
Expand All @@ -946,7 +950,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
* we return a `MATCH_ANY` version to indicate no document was index. The value is
* not used anyway
*/
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
} else {
throw ex;
}
Expand Down Expand Up @@ -1019,8 +1023,8 @@ static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
}

static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
final IndexResult result = new IndexResult(e, currentVersion, term);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
}
Expand Down Expand Up @@ -1097,7 +1101,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
Expand Down Expand Up @@ -1178,7 +1182,7 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
final DeletionStrategy plan;
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(
currentlyDeleted,
Expand All @@ -1201,12 +1205,12 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()));
return new DeleteResult(
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure
return new DeleteResult(
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} else {
throw ex;
}
Expand Down Expand Up @@ -1237,9 +1241,9 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
}

static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
}

Expand Down Expand Up @@ -1268,7 +1272,7 @@ public NoOpResult noOp(final NoOp noOp) {
try (ReleasableLock ignored = readLock.acquire()) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
}
return noOpResult;
}
Expand All @@ -1278,7 +1282,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
final NoOpResult noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Loading