From f934b33d783226008922747abcb353ab77788b39 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:13:31 -0500 Subject: [PATCH] Fix download reporting for segment replication (#10644) (#10705) (cherry picked from commit c00141770f1cde3a26ab3e514866ad136379c7f0) Signed-off-by: Kunal Kotwani Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../PrimaryShardReplicationSource.java | 6 ++ .../RemoteStoreReplicationSource.java | 9 ++- .../replication/SegmentReplicationSource.java | 73 +++++++++++++++++++ .../replication/SegmentReplicationTarget.java | 23 +++++- .../index/shard/RemoteIndexShardTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 2 + ...licationWithNodeToNodeIndexShardTests.java | 6 ++ .../PrimaryShardReplicationSourceTests.java | 3 + .../RemoteStoreReplicationSourceTests.java | 20 ++++- .../SegmentReplicationTargetServiceTests.java | 4 + .../SegmentReplicationTargetTests.java | 50 +++++++++++++ .../replication/TestReplicationSource.java | 2 + .../index/shard/IndexShardTestCase.java | 2 + 13 files changed, 198 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 9dcd16c53e6f3..02fc8feefd698 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -22,6 +22,7 @@ import org.opensearch.transport.TransportService; import java.util.List; +import java.util.function.BiConsumer; import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO; import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES; @@ -80,8 +81,13 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { + // fileProgressTracker is a no-op for node to node recovery + // MultiFileWriter takes care of progress tracking for downloads in this scenario + // TODO: Move state management and tracking into replication methods and use chunking and data + // copy mechanisms only from MultiFileWriter final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index d2000a56401f5..12eabf1e6554f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -95,6 +96,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { try { @@ -117,7 +119,12 @@ public void getSegmentFiles( assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } - indexShard.getFileDownloader().download(remoteDirectory, storeDirectory, toDownloadSegmentNames); + indexShard.getFileDownloader() + .download( + remoteDirectory, + new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), + toDownloadSegmentNames + ); logger.debug("Downloaded segment files from remote store {}", filesToFetch); } finally { indexShard.store().decRef(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index 6676b5b667e42..24f0cb15ddb25 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -8,13 +8,19 @@ package org.opensearch.indices.replication; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.IOException; import java.util.List; +import java.util.function.BiConsumer; /** * Represents the source of a replication event. @@ -39,6 +45,7 @@ public interface SegmentReplicationSource { * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. * @param filesToFetch {@link List} List of files to fetch. * @param indexShard {@link IndexShard} Reference to the IndexShard. + * @param fileProgressTracker {@link BiConsumer} A consumer that updates the replication progress for shard files. * @param listener {@link ActionListener} Listener that completes with the list of files copied. */ void getSegmentFiles( @@ -46,6 +53,7 @@ void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ); @@ -58,4 +66,69 @@ void getSegmentFiles( * Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}. */ default void cancel() {} + + /** + * Directory wrapper that records copy process for replication statistics + * + * @opensearch.internal + */ + final class ReplicationStatsDirectoryWrapper extends FilterDirectory { + private final BiConsumer fileProgressTracker; + + ReplicationStatsDirectoryWrapper(Directory in, BiConsumer fileProgressTracker) { + super(in); + this.fileProgressTracker = fileProgressTracker; + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + // here we wrap the index input form the source directory to report progress of file copy for the recovery stats. + // we increment the num bytes recovered in the readBytes method below, if users pull statistics they can see immediately + // how much has been recovered. + in.copyFrom(new FilterDirectory(from) { + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + final IndexInput input = in.openInput(name, context); + return new IndexInput("StatsDirectoryWrapper(" + input.toString() + ")") { + @Override + public void close() throws IOException { + input.close(); + } + + @Override + public long getFilePointer() { + throw new UnsupportedOperationException("only straight copies are supported"); + } + + @Override + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException("seeks are not supported"); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException("slices are not supported"); + } + + @Override + public byte readByte() throws IOException { + throw new UnsupportedOperationException("use a buffer if you wanna perform well"); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + // we rely on the fact that copyFrom uses a buffer + input.readBytes(b, offset, len); + fileProgressTracker.accept(dest, (long) len); + } + }; + } + }, src, dest, context); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 0eb6ce36fa63d..cd6dbe8af90d9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -170,7 +170,14 @@ public void startReplication(ActionListener listener) { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener); + source.getSegmentFiles( + getId(), + checkpointInfo.getCheckpoint(), + filesToFetch, + indexShard, + this::updateFileRecoveryBytes, + getFilesListener + ); }, listener::onFailure); getFilesListener.whenComplete(response -> { @@ -240,6 +247,20 @@ private boolean validateLocalChecksum(StoreFileMetadata file) { } } + /** + * Updates the state to reflect recovery progress for the given file and + * updates the last access time for the target. + * @param fileName Name of the file being downloaded + * @param bytesRecovered Number of bytes recovered + */ + private void updateFileRecoveryBytes(String fileName, long bytesRecovered) { + ReplicationLuceneIndex index = state.getIndex(); + if (index != null) { + index.addRecoveredBytesToFile(fileName, bytesRecovered); + } + setLastAccessTime(); + } + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 6a99063d11353..fe389e3b3fcb4 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -388,9 +389,10 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { - super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener); runAfterGetFiles[index.getAndIncrement()].run(); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 528402d48658a..52f28aead533d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -91,6 +91,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -725,6 +726,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { // set the listener, we will only fail it once we cancel the source. diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index c394101697b47..f0950fe5392de 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -87,6 +88,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { // randomly resolve the listener, indicating the source has resolved. @@ -131,6 +133,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { Assert.fail("Should not be reached"); @@ -176,6 +179,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { Assert.fail("Unreachable"); @@ -223,6 +227,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) {} }; @@ -269,6 +274,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index bcacef83d190a..e4dd32e5c6f70 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -125,6 +125,7 @@ public void testGetSegmentFiles() { checkpoint, Arrays.asList(testMetadata), mock(IndexShard.class), + (fileName, bytesRecovered) -> {}, mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -153,6 +154,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() { checkpoint, Arrays.asList(testMetadata), mock(IndexShard.class), + (fileName, bytesRecovered) -> {}, mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -178,6 +180,7 @@ public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedExcep checkpoint, Arrays.asList(testMetadata), mock(IndexShard.class), + (fileName, bytesRecovered) -> {}, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index 9204f48ba5bdd..287962b158c79 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -90,7 +90,7 @@ public void testGetSegmentFiles() throws ExecutionException, InterruptedExceptio List filesToFetch = primaryShard.getSegmentMetadataMap().values().stream().collect(Collectors.toList()); final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource = new RemoteStoreReplicationSource(primaryShard); - replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, filesToFetch, replicaShard, res); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, filesToFetch, replicaShard, (fileName, bytesRecovered) -> {}, res); GetSegmentFilesResponse response = res.get(); assertEquals(response.files.size(), filesToFetch.size()); assertTrue(response.files.containsAll(filesToFetch)); @@ -104,7 +104,14 @@ public void testGetSegmentFilesAlreadyExists() throws IOException, InterruptedEx try { final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource = new RemoteStoreReplicationSource(primaryShard); - replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, filesToFetch, primaryShard, res); + replicationSource.getSegmentFiles( + REPLICATION_ID, + checkpoint, + filesToFetch, + primaryShard, + (fileName, bytesRecovered) -> {}, + res + ); res.get(); } catch (AssertionError | ExecutionException ex) { latch.countDown(); @@ -118,7 +125,14 @@ public void testGetSegmentFilesReturnEmptyResponse() throws ExecutionException, final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint(); final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource = new RemoteStoreReplicationSource(primaryShard); - replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), primaryShard, res); + replicationSource.getSegmentFiles( + REPLICATION_ID, + checkpoint, + Collections.emptyList(), + primaryShard, + (fileName, bytesRecovered) -> {}, + res + ); GetSegmentFilesResponse response = res.get(); assert (response.files.isEmpty()); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 6fe8163f53fd5..826c5fcfb6219 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -215,6 +216,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { Assert.fail("Should not be called"); @@ -280,6 +282,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -337,6 +340,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { Assert.fail("Unreachable"); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0fae8443c5ce5..3804230942430 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -40,6 +40,7 @@ import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; @@ -53,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.function.BiConsumer; import org.mockito.Mockito; @@ -131,10 +133,12 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { assertEquals(1, filesToFetch.size()); assert (filesToFetch.contains(SEGMENT_FILE)); + filesToFetch.forEach(storeFileMetadata -> fileProgressTracker.accept(storeFileMetadata.name(), storeFileMetadata.length())); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -149,6 +153,19 @@ public void getSegmentFiles( public void onResponse(Void replicationResponse) { try { verify(spyIndexShard, times(1)).finalizeReplication(any()); + assertEquals( + 1, + segrepTarget.state() + .getIndex() + .fileDetails() + .stream() + .filter(ReplicationLuceneIndex.FileMetadata::fullyRecovered) + .count() + ); + assertEquals( + 0, + segrepTarget.state().getIndex().fileDetails().stream().filter(file -> file.fullyRecovered() == false).count() + ); segrepTarget.markAsDone(); } catch (IOException ex) { Assert.fail(); @@ -182,6 +199,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -200,6 +218,15 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + assertEquals( + 0, + segrepTarget.state() + .getIndex() + .fileDetails() + .stream() + .filter(ReplicationLuceneIndex.FileMetadata::fullyRecovered) + .count() + ); assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } @@ -225,6 +252,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onFailure(exception); @@ -243,6 +271,15 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + assertEquals( + 0, + segrepTarget.state() + .getIndex() + .fileDetails() + .stream() + .filter(ReplicationLuceneIndex.FileMetadata::fullyRecovered) + .count() + ); assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } @@ -268,6 +305,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -314,6 +352,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -358,6 +397,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -376,6 +416,15 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + assertEquals( + 0, + segrepTarget.state() + .getIndex() + .fileDetails() + .stream() + .filter(ReplicationLuceneIndex.FileMetadata::fullyRecovered) + .count() + ); assertTrue(e instanceof OpenSearchCorruptionException); assertTrue(e.getMessage().contains("has local copies of segments that differ from the primary")); segrepTarget.fail(new ReplicationFailedException(e), false); @@ -410,6 +459,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index b29e25a0bff2c..bcd47e3d578ee 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -17,6 +17,7 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.List; +import java.util.function.BiConsumer; /** * This class is used by unit tests implementing SegmentReplicationSource @@ -36,6 +37,7 @@ public abstract void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 9dc230474482f..412d5235fe462 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -174,6 +174,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -1620,6 +1621,7 @@ public void getSegmentFiles( ReplicationCheckpoint checkpoint, List filesToFetch, IndexShard indexShard, + BiConsumer fileProgressTracker, ActionListener listener ) { try (