Skip to content

Commit

Permalink
[Segment Replication] Fix OngoingSegmentReplications to key by alloca…
Browse files Browse the repository at this point in the history
…tion ID instead of DiscoveryNode. (opensearch-project#4182)

* Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode.

This change fixes segrep to work with multiple shards per node by keying ongoing replications on
allocation ID.  This also updates cancel methods to ensure state is properly cleared on shard cancel.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Clean up cancel methods.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored and Rishikesh1159 committed Aug 17, 2022
1 parent ee6b0b2 commit be17fd2
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(1, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -262,15 +310,17 @@ private void waitForReplicaUpdate() throws Exception {
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Manages references to ongoing segrep events on a node.
Expand All @@ -38,7 +41,7 @@ class OngoingSegmentReplications {
private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
private final Map<DiscoveryNode, SegmentReplicationSourceHandler> nodesToHandlers;
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;

/**
* Constructor.
Expand All @@ -50,7 +53,7 @@ class OngoingSegmentReplications {
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.nodesToHandlers = ConcurrentCollections.newConcurrentMap();
this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
}

/**
Expand Down Expand Up @@ -96,8 +99,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
* @param listener {@link ActionListener} that resolves when sending files is complete.
*/
void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final DiscoveryNode node = request.getTargetNode();
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId());
if (handler != null) {
if (handler.isReplicating()) {
throw new OpenSearchException(
Expand All @@ -108,7 +110,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
// update the given listener to release the CopyState before it resolves.
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node);
final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId());
if (sourceHandler != null) {
removeCopyState(sourceHandler.getCopyState());
}
Expand All @@ -123,19 +125,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
if (handler != null) {
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
Expand All @@ -149,9 +138,9 @@ void cancelReplication(DiscoveryNode node) {
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (nodesToHandlers.putIfAbsent(
request.getTargetNode(),
createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter)
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
) != null) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
Expand All @@ -163,18 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
}

/**
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
if (entry.getCopyState().getShard().equals(shard)) {
entry.cancel(reason);
}
}
copyStateMap.clear();
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand All @@ -186,19 +180,25 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
}

int size() {
return nodesToHandlers.size();
return allocationIdToHandlers.size();
}

int cachedCopyStateSize() {
return copyStateMap.size();
}

private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
CopyState copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
Expand Down Expand Up @@ -231,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
}
}

/**
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
* This will also decref the handler's CopyState reference.
*/
private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
final List<String> allocationIds = allocationIdToHandlers.values()
.stream()
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class SegmentReplicationSourceHandler {
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;
private final AtomicBoolean isReplicating = new AtomicBoolean();
private final DiscoveryNode targetNode;
private final String allocationId;

/**
* Constructor.
Expand All @@ -70,9 +72,11 @@ class SegmentReplicationSourceHandler {
FileChunkWriter writer,
ThreadPool threadPool,
CopyState copyState,
String allocationId,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
this.targetNode = targetNode;
this.shard = copyState.getShard();
this.logger = Loggers.getLogger(
SegmentReplicationSourceHandler.class,
Expand All @@ -89,6 +93,7 @@ class SegmentReplicationSourceHandler {
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
this.allocationId = allocationId;
this.copyState = copyState;
}

Expand Down Expand Up @@ -118,7 +123,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
logger.debug(
"delaying replication of {} as it is not listed as assigned to target node {}",
shard.shardId(),
request.getTargetNode()
targetNode
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
Expand Down Expand Up @@ -175,4 +180,12 @@ CopyState getCopyState() {
public boolean isReplicating() {
return isReplicating.get();
}

public DiscoveryNode getTargetNode() {
return targetNode;
}

public String getAllocationId() {
return allocationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public void testCancelReplication() throws IOException {
}

public void testMultipleReplicasUseSameCheckpoint() throws IOException {
IndexShard secondReplica = newShard(primary.shardId(), false);
recoverReplica(secondReplica, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
Expand All @@ -172,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
secondReplica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
Expand All @@ -187,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(secondReplica);
}

public void testStartCopyWithoutPrepareStep() {
Expand Down Expand Up @@ -272,4 +276,40 @@ public void onFailure(Exception e) {
}
});
}

public void testCancelAllReplicationsForShard() throws IOException {
// This tests when primary has multiple ongoing replications.
IndexShard replica_2 = newShard(primary.shardId(), false);
recoverReplica(replica_2, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
primaryDiscoveryNode,
testCheckpoint
);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
assertEquals(1, copyState.refCount());

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica_2.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));

assertEquals(2, copyState.refCount());
assertEquals(2, replications.size());
assertEquals(1, replications.cachedCopyStateSize());

// cancel the primary's ongoing replications.
replications.cancel(primary, "Test");
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void testSendFiles() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -103,6 +104,7 @@ public void testSendFiles_emptyRequest() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -141,6 +143,7 @@ public void testSendFileFails() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -178,6 +181,7 @@ public void testReplicationAlreadyRunning() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down

0 comments on commit be17fd2

Please sign in to comment.