Skip to content

Commit

Permalink
Fix SegmentReplicationIT.testReplicaHasDiffFilesThanPrimary for node-…
Browse files Browse the repository at this point in the history
…node replication (#8912) (#8936)

* Fix SegmentReplicationIT.testReplicahasDiffFilesThanPrimary

This test is now failing for node-node replication. On the primary shard the prepareSegmentReplication method should cancel any ongoing replication if it is running and start a new sync.  Thisis incorrectly using Map.compute which will not replace the existing handler entry in the allocationIdToHandlers map. It will only cancel the existing source handler. As a result this can leave the copyState map with an entry and hold an index commit while the test is cleaning up.  The copyState is only cleared when a handler is cancelled directly or from a cluster state update.



* PR feedback.



---------


(cherry picked from commit 4ad4182)

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 212dba4 commit 5164fe2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,25 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
}
return createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter);
});
final SegmentReplicationSourceHandler newHandler = createTargetHandler(
request.getTargetNode(),
copyState,
request.getTargetAllocationId(),
fileChunkWriter
);
final SegmentReplicationSourceHandler existingHandler = allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
newHandler
);
// If we are already replicating to this allocation Id, cancel the old and replace with a new execution.
// This will clear the old handler & referenced copy state holding an incref'd indexCommit.
if (existingHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false;
allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler);
}
assert allocationIdToHandlers.containsKey(request.getTargetAllocationId());
return copyState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,4 +403,38 @@ public void testCancelForMissingIds() throws IOException {
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}

public void testPrepareForReplicationAlreadyReplicating() throws IOException {
OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final String replicaAllocationId = replica.routingEntry().allocationId().getId();
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));

final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId);
assertEquals(handler.getCopyState(), copyState);
assertEquals(1, copyState.refCount());

ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
testCheckpoint.getShardId(),
testCheckpoint.getPrimaryTerm(),
testCheckpoint.getSegmentsGen(),
testCheckpoint.getSegmentInfosVersion() + 1,
testCheckpoint.getCodec()
);

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replicaAllocationId,
primaryDiscoveryNode,
secondCheckpoint
);

final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));
final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId);
assertEquals(secondHandler.getCopyState(), secondCopyState);
assertEquals("New copy state is incref'd", 1, secondCopyState.refCount());
assertEquals("Old copy state is cleaned up", 0, copyState.refCount());

}
}

0 comments on commit 5164fe2

Please sign in to comment.