diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 7d3a2a8f69bc8..d03ff3baf9bbd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -18,7 +18,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; @@ -27,6 +29,7 @@ import org.opensearch.transport.TransportService; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 2893758354352..2f4ca208b1c63 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -46,8 +46,8 @@ class OngoingSegmentReplications implements Closeable { private final RecoverySettings recoverySettings; private final IndicesService indicesService; private final Map allocationIdToHandlers; - private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); private final Map copyStateMap; + private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); /** * Constructor. @@ -101,7 +101,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener copyState) { final SegmentReplicationSourceHandler segmentReplicationSourceHandler = allocationIdToHandlers.putIfAbsent( @@ -120,26 +120,46 @@ void prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileCh } } + private SegmentReplicationSourceHandler createTargetHandler( + DiscoveryNode node, + GatedCloseable copyState, + String allocationId, + FileChunkWriter fileChunkWriter + ) { + return new SegmentReplicationSourceHandler( + node, + fileChunkWriter, + copyState, + allocationId, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } + /** * Cancel all Replication events for the given shard, intended to be called when a primary is shutting down. * * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(IndexShard shard, String reason) { - cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + void cancel(IndexShard shard, String reason) { if (shard.routingEntry().primary()) { - clearCopyStateForShard(shard.shardId()); + cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + final CopyState remove = copyStateMap.remove(shard.shardId()); + if (remove != null) { + remove.decRef(); + } } } + /** * Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down. * * @param allocationId {@link String} - Allocation ID. * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(String allocationId, String reason) { + void cancel(String allocationId, String reason) { final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); if (handler != null) { handler.cancel(reason); @@ -164,22 +184,6 @@ int size() { return allocationIdToHandlers.size(); } - private SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - GatedCloseable copyState, - String allocationId, - FileChunkWriter fileChunkWriter - ) { - return new SegmentReplicationSourceHandler( - node, - fileChunkWriter, - copyState, - allocationId, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks() - ); - } - /** * Remove handlers from allocationIdToHandlers map based on a filter predicate. * This will also decref the handler's CopyState reference. @@ -195,39 +199,34 @@ private void cancelHandlers(Predicate p } } - public synchronized void setCopyState(IndexShard indexShard) { - // We can only compute CopyState for shards that have started. - if (indexShard.state() == IndexShardState.STARTED) { - final CopyState state; + /** + * Build and store a new {@link CopyState} for the given {@link IndexShard}. + * + * @param indexShard - Primary shard. + */ + public void setCopyState(IndexShard indexShard) { + if (indexShard.state() == IndexShardState.STARTED && indexShard.verifyPrimaryMode()) { try { - state = new CopyState(indexShard); + final CopyState state = new CopyState(indexShard); + final CopyState oldCopyState = copyStateMap.remove(indexShard.shardId()); + if (oldCopyState != null) { + oldCopyState.decRef(); + } + copyStateMap.put(indexShard.shardId(), state); } catch (IOException e) { throw new UncheckedIOException(e); } - final CopyState oldCopyState = copyStateMap.remove(indexShard.shardId()); - if (oldCopyState != null) { - oldCopyState.decRef(); - } - // build the CopyState object and cache it before returning - copyStateMap.put(indexShard.shardId(), state); - } - } - - @Override - public synchronized void close() throws IOException { - for (CopyState value : copyStateMap.values()) { - value.decRef(); } } - public synchronized void clearCopyStateForShard(ShardId shardId) { - final CopyState remove = copyStateMap.remove(shardId); - if (remove != null) { - remove.decRef(); - } - } - - public synchronized GatedCloseable getLatestCopyState(ShardId shardId) { + /** + * Get the latest {@link CopyState} for the given shardId. This method returns an incref'd CopyState wrapped + * in a {@link GatedCloseable}, when released the copyState is decRef'd. + * + * @param shardId {@link ShardId} + * @return {@link GatedCloseable} Closeable containing the CopyState. + */ + public GatedCloseable getCopyState(ShardId shardId) { final CopyState copyState = copyStateMap.get(shardId); if (copyState != null) { copyState.incRef(); @@ -238,7 +237,18 @@ public synchronized GatedCloseable getLatestCopyState(ShardId shardId throw new IndexShardNotStartedException(shardId, indexShard.state()); } + // for tests. Map getCopyStateMap() { return copyStateMap; } + + @Override + public void close() throws IOException { + // Extra check to ensure all copyState has been cleaned up. + for (CopyState value : copyStateMap.values()) { + if (value.refCount() > 0) { + value.decRef(); + } + } + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 95a37ed177701..d4cc042aee1b2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -121,7 +121,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan final ReplicationTimer timer = new ReplicationTimer(); assert ongoingSegmentReplications != null; timer.start(); - final GatedCloseable copyStateGatedCloseable = ongoingSegmentReplications.getLatestCopyState( + final GatedCloseable copyStateGatedCloseable = ongoingSegmentReplications.getCopyState( request.getCheckpoint().getShardId() ); final CopyState copyState = copyStateGatedCloseable.get(); @@ -131,11 +131,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan } else { if (request.getCheckpoint().isAheadOf(copyState.getCheckpoint()) || copyState.getMetadataMap().isEmpty()) { // if there are no files to send, or the replica is already at this checkpoint, send the infos but do not hold - // snapshotted - // infos. - // During recovery of an empty cluster it is possible we have no files to send but the primary has flushed to set - // userData, - // in this case we still want to send over infos. + // snapshotted infos. During recovery of an empty cluster it is possible we have no files to send but the + // primary has flushed to set userData, in this case we still want to send over infos. channel.sendResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), Collections.emptyMap(), copyState.getInfosBytes()) ); @@ -221,25 +218,13 @@ protected void doClose() throws IOException { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { - ongoingSegmentReplications.cancel(indexShard, "shard is closed"); - } - } - - /** - * Cancels any replications on this node to a replica that has been promoted as primary. - */ - @Override - public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { - if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) { - ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard."); - } - if (indexShard != null && oldRouting.primary() && newRouting.primary() == false) { - ongoingSegmentReplications.clearCopyStateForShard(indexShard.shardId()); + if (indexShard.routingEntry().primary()) { + ongoingSegmentReplications.cancel(indexShard, "shard is closed"); + } } } public void onPrimaryRefresh(IndexShard indexShard) { ongoingSegmentReplications.setCopyState(indexShard); } - } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 1ed8b0a993dea..4d01029d7a04e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -21,6 +21,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardNotStartedException; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -111,7 +112,7 @@ public void testPrepareAndSendSegments() throws IOException { listener.onResponse(null); }; replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(1, replications.size()); @@ -131,11 +132,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(1, copyState.refCount()); assertEquals(0, replications.size()); - try { - replications.close(); - } catch (IOException e) { - fail(); - } + replications.cancel(copyState.getShard(), "Closing"); assertEquals(0, copyState.refCount()); } @@ -160,7 +157,7 @@ public void testCancelReplication() throws IOException { Assert.fail(); }; replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(1, replications.size()); @@ -168,7 +165,7 @@ public void testCancelReplication() throws IOException { replications.cancelReplication(primaryDiscoveryNode); assertEquals(1, copyState.refCount()); assertEquals(0, replications.size()); - replications.close(); + replications.cancel(copyState.getShard(), "Closing"); assertEquals(0, copyState.refCount()); } @@ -189,7 +186,7 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int replications.cancel(replica.routingEntry().allocationId().getId(), "Test"); }; replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(1, replications.size()); @@ -216,7 +213,7 @@ public void onFailure(Exception e) { }); latch.await(2, TimeUnit.SECONDS); assertEquals("listener should have resolved with failure", 0, latch.getCount()); - replications.close(); + replications.cancel(primary, "Closing"); assertEquals(0, copyState.refCount()); } @@ -237,7 +234,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { }; replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(2, copyState.refCount()); @@ -248,7 +245,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { replicaDiscoveryNode, testCheckpoint ); - final GatedCloseable closeableTwo = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable closeableTwo = replications.getCopyState(primary.shardId()); replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter, closeableTwo); assertEquals(3, copyState.refCount()); assertEquals(2, replications.size()); @@ -258,7 +255,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { assertEquals(1, copyState.refCount()); assertEquals(0, replications.size()); closeShards(secondReplica); - replications.close(); + replications.cancel(copyState.getShard(), "Closing"); assertEquals(0, copyState.refCount()); } @@ -288,7 +285,7 @@ public void onFailure(Exception e) { verify(listener, times(1)).onResponse(any()); } - public void testShardAlreadyReplicatingToNode() throws IOException { + public void testShardAlreadyReplicatingToNode() { OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -300,14 +297,24 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); assertThrows( OpenSearchException.class, - () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); } + () -> replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable) ); } + public void testThrowShardNotStartedExceptionWhenNoCopyState() throws IOException { + closeShards(primary); + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + primary = newShard(true, settings); + System.out.println(primary.state()); + replications.setCopyState(primary); + assertThrows(IndexShardNotStartedException.class, () -> replications.getCopyState(primary.shardId())); + closeShards(primary); + } + public void testStartReplicationWithNoFilesToFetch() throws IOException { // create a replications object and request a checkpoint. OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); @@ -321,7 +328,7 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); // Prepare for replication step - and ensure copyState is added to cache. replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, segmentSegmentFileChunkWriter, copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(1, replications.size()); @@ -342,11 +349,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); assertEquals(1, copyState.refCount()); verifyNoInteractions(segmentSegmentFileChunkWriter); - try { - replications.close(); - } catch (IOException e) { - fail(); - } + replications.cancel(copyState.getShard(), "Closing"); assertEquals(0, copyState.refCount()); } @@ -372,7 +375,7 @@ public void testCancelAllReplicationsForShard() throws IOException { ); replications.setCopyState(primary); - final GatedCloseable copyStateGatedCloseable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable copyStateGatedCloseable = replications.getCopyState(primary.shardId()); replications.prepareForReplication(request, mock(FileChunkWriter.class), copyStateGatedCloseable); final CopyState copyState = copyStateGatedCloseable.get(); assertEquals(2, copyState.refCount()); @@ -383,7 +386,7 @@ public void testCancelAllReplicationsForShard() throws IOException { replicaDiscoveryNode, testCheckpoint ); - final GatedCloseable closeableTwo = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable closeableTwo = replications.getCopyState(primary.shardId()); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class), closeableTwo); assertEquals(3, copyState.refCount()); @@ -403,9 +406,9 @@ public void testCopyStateRefCounts() throws IOException { assertEquals(1, copyStateMap.size()); final CopyState copyState = copyStateMap.get(primary.shardId()); assertEquals("CopyState is snapshotted until released", 1, copyState.refCount()); - final GatedCloseable closeable = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable closeable = replications.getCopyState(primary.shardId()); assertEquals("CopyState refCount is increased when accessed", 2, copyState.refCount()); - final GatedCloseable closeable_2 = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable closeable_2 = replications.getCopyState(primary.shardId()); assertEquals("CopyState refCount is increased when accessed", 3, copyState.refCount()); // index a random doc and refresh. @@ -416,7 +419,7 @@ public void testCopyStateRefCounts() throws IOException { final CopyState copyState_2 = copyStateMap.get(primary.shardId()); assertEquals(1, copyState_2.refCount()); assertEquals("CopyState not in map remains refCounted until closed", 2, copyState.refCount()); - final GatedCloseable closeable_3 = replications.getLatestCopyState(primary.shardId()); + final GatedCloseable closeable_3 = replications.getCopyState(primary.shardId()); assertEquals("CopyState refCount is increased when accessed", 2, copyState_2.refCount()); assertEquals(copyState_2, closeable_3.get()); @@ -427,7 +430,7 @@ public void testCopyStateRefCounts() throws IOException { closeable_3.close(); assertEquals(1, copyState_2.refCount()); - replications.close(); + replications.cancel(primary, "Closing"); assertEquals(0, copyState_2.refCount()); } } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 4418245734e8c..2acec995fc275 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -109,6 +109,7 @@ import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.node.MockNode; import org.opensearch.node.Node; import org.opensearch.node.Node.DiscoverySettings; @@ -1427,19 +1428,24 @@ private void assertNoSnapshottedIndexCommit() throws Exception { for (NodeAndClient nodeAndClient : nodes.values()) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { - if (indexService.getIndexSettings().isSegRepEnabled() == false) { - for (IndexShard indexShard : indexService) { - try { - Engine engine = IndexShardTestCase.getEngine(indexShard); - if (engine instanceof InternalEngine) { - assertFalse( + // if segrep is enabled on the index, release our CopyState map, + // this will happen on node shutdown, but allows us to assert no other open CopyStates are open. + final SegmentReplicationSourceService segmentReplicationSourceService = getInstance( + SegmentReplicationSourceService.class, + nodeAndClient.name + ); + for (IndexShard indexShard : indexService) { + segmentReplicationSourceService.beforeIndexShardClosed(indexShard.shardId(), indexShard, indexShard.indexSettings().getSettings()); + try { + Engine engine = IndexShardTestCase.getEngine(indexShard); + if (engine instanceof InternalEngine) { + assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", EngineTestCase.hasSnapshottedCommits(engine) ); - } - } catch (AlreadyClosedException ignored) { - } + } catch (AlreadyClosedException ignored) { + } } }