Skip to content

Commit

Permalink
Distinguish between operation and cluster state term
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jul 30, 2018
1 parent 9396f53 commit a102ef9
Show file tree
Hide file tree
Showing 16 changed files with 116 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
switch (indexResult.getResultType()) {
case SUCCESS:
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
indexResult.getSeqNo(), primary.getOperationPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
Expand All @@ -161,7 +161,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
switch (deleteResult.getResultType()) {
case SUCCESS:
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
deleteResult.getSeqNo(), primary.getOperationPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
case FAILURE:
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
Expand Down Expand Up @@ -300,7 +300,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
assert result instanceof Engine.IndexResult : result.getClass();
final IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
result.getSeqNo(), primary.getOperationPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
indexResponse.getResult());
Expand All @@ -320,7 +320,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
final DeleteRequest updateDeleteRequest = translate.action();

final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
result.getSeqNo(), primary.getOperationPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());

updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
Expand Down Expand Up @@ -490,6 +490,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
assert replica.getOperationPrimaryTerm() == primaryResponse.getPrimaryTerm();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPrimaryTerm();
final long actualTerm = indexShard.getClusterStatePrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
Expand Down
Loading

0 comments on commit a102ef9

Please sign in to comment.