Skip to content

Commit

Permalink
Fix OngoingSegmentReplications to key by allocation ID instead of Dis…
Browse files Browse the repository at this point in the history
…coveryNode.

This change fixes segrep to work with multiple shards per node by keying ongoing replications on
allocation ID.  This also updates cancel methods to ensure state is properly cleared on shard cancel.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Aug 10, 2022
1 parent d09c285 commit c95112f
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(1, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -262,15 +310,17 @@ private void waitForReplicaUpdate() throws Exception {
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Manages references to ongoing segrep events on a node.
Expand All @@ -38,7 +40,7 @@ class OngoingSegmentReplications {
private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
private final Map<DiscoveryNode, SegmentReplicationSourceHandler> nodesToHandlers;
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;

/**
* Constructor.
Expand All @@ -50,7 +52,7 @@ class OngoingSegmentReplications {
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.nodesToHandlers = ConcurrentCollections.newConcurrentMap();
this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
}

/**
Expand Down Expand Up @@ -96,8 +98,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
* @param listener {@link ActionListener} that resolves when sending files is complete.
*/
void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final DiscoveryNode node = request.getTargetNode();
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId());
if (handler != null) {
if (handler.isReplicating()) {
throw new OpenSearchException(
Expand All @@ -108,7 +109,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
// update the given listener to release the CopyState before it resolves.
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node);
final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId());
if (sourceHandler != null) {
removeCopyState(sourceHandler.getCopyState());
}
Expand All @@ -129,10 +130,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
if (handler != null) {
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
for (Map.Entry<String, SegmentReplicationSourceHandler> entry : allocationIdToHandlers.entrySet()) {
if (entry.getValue().getTargetNode().equals(node)) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(entry.getKey());
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}
}

Expand All @@ -149,8 +152,8 @@ void cancelReplication(DiscoveryNode node) {
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (nodesToHandlers.putIfAbsent(
request.getTargetNode(),
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter)
) != null) {
throw new OpenSearchException(
Expand All @@ -169,12 +172,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
if (entry.getCopyState().getShard().equals(shard)) {
entry.cancel(reason);
for (Map.Entry<String, SegmentReplicationSourceHandler> handlerEntry : allocationIdToHandlers.entrySet()) {
if (handlerEntry.getValue().getCopyState().getShard().shardId().equals(shard.shardId())) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(handlerEntry.getKey());
handler.cancel(reason);
}
}
final List<CopyState> list = copyStateMap.values()
.stream()
.filter(state -> state.getShard().shardId().equals(shard.shardId()))
.collect(Collectors.toList());
for (CopyState copyState : list) {
if (copyState.getShard().shardId().equals(shard.shardId())) {
while (copyStateMap.containsKey(copyState.getRequestedReplicationCheckpoint())) {
removeCopyState(copyState);
}
}
}
copyStateMap.clear();
}

/**
Expand All @@ -186,7 +200,7 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
}

int size() {
return nodesToHandlers.size();
return allocationIdToHandlers.size();
}

int cachedCopyStateSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SegmentReplicationSourceHandler {
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;
private final AtomicBoolean isReplicating = new AtomicBoolean();
private final DiscoveryNode targetNode;

/**
* Constructor.
Expand All @@ -73,6 +74,7 @@ class SegmentReplicationSourceHandler {
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
this.targetNode = targetNode;
this.shard = copyState.getShard();
this.logger = Loggers.getLogger(
SegmentReplicationSourceHandler.class,
Expand Down Expand Up @@ -118,7 +120,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
logger.debug(
"delaying replication of {} as it is not listed as assigned to target node {}",
shard.shardId(),
request.getTargetNode()
targetNode
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
Expand Down Expand Up @@ -175,4 +177,8 @@ CopyState getCopyState() {
public boolean isReplicating() {
return isReplicating.get();
}

public DiscoveryNode getTargetNode() {
return targetNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public void testCancelReplication() throws IOException {
}

public void testMultipleReplicasUseSameCheckpoint() throws IOException {
IndexShard secondReplica = newShard(primary.shardId(), false);
recoverReplica(secondReplica, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
Expand All @@ -172,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
secondReplica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
Expand All @@ -187,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(secondReplica);
}

public void testStartCopyWithoutPrepareStep() {
Expand Down Expand Up @@ -272,4 +276,40 @@ public void onFailure(Exception e) {
}
});
}

public void testCancelAllReplicationsForShard() throws IOException {
// This tests when primary has multiple ongoing replications.
IndexShard replica_2 = newShard(primary.shardId(), false);
recoverReplica(replica_2, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
primaryDiscoveryNode,
testCheckpoint
);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
assertEquals(1, copyState.refCount());

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica_2.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));

assertEquals(2, copyState.refCount());
assertEquals(2, replications.size());
assertEquals(1, replications.cachedCopyStateSize());

// cancel the primary's ongoing replications.
replications.cancel(primary, "Test");
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}
}

0 comments on commit c95112f

Please sign in to comment.