Skip to content

Commit

Permalink
[Segment Replication] Extend FileChunkWriter to allow cancel on trans…
Browse files Browse the repository at this point in the history
…port client (opensearch-project#4386)

* [Segment Replication] Extend FileChunkWriter to allow cancel on retryable transport client

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Add changelog entry

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Integration test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Sep 6, 2022
1 parent 3d90d35 commit e4ea9d9
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -61,6 +67,11 @@ public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
public Settings indexSettings() {
return Settings.builder()
Expand Down Expand Up @@ -314,6 +325,65 @@ public void testReplicationAfterForceMerge() throws Exception {
}
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
primaryNode
);
final IndexShard primaryShard = getIndexShard(primaryNode);

CountDownLatch latch = new CountDownLatch(1);

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
FileChunkRequest req = (FileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if (req.name().endsWith("cfs") && req.lastChunk()) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
connection.sendRequest(requestId, action, request, options);
}
);

final int docCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);

flush(INDEX_NAME);
}
segmentReplicationSourceService.beforeIndexShardClosed(primaryShard.shardId(), primaryShard, indexSettings());
latch.countDown();
assertDocCounts(docCount, primaryNode);
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ void writeFileChunk(
int totalTranslogOps,
ActionListener<Void> listener
);

default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* node's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,9 @@ public void writeFileChunk(
reader
);
}

@Override
public void cancel() {
retryableTransportClient.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class SegmentReplicationSourceHandler {
private final DiscoveryNode targetNode;
private final String allocationId;

private final FileChunkWriter writer;

/**
* Constructor.
*
Expand Down Expand Up @@ -96,6 +98,7 @@ class SegmentReplicationSourceHandler {
);
this.allocationId = allocationId;
this.copyState = copyState;
this.writer = writer;
}

/**
Expand Down Expand Up @@ -186,9 +189,10 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
}

/**
* Cancels the recovery and interrupts all eligible threads.
* Cancels the replication and interrupts all eligible threads.
*/
public void cancel(String reason) {
writer.cancel();
cancellableThreads.cancel(reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase {

Expand Down Expand Up @@ -241,6 +243,7 @@ public void onFailure(Exception e) {
}
});
latch.await(2, TimeUnit.SECONDS);
verify(chunkWriter, times(1)).cancel();
assertEquals("listener should have resolved with failure", 0, latch.getCount());
}
}

0 comments on commit e4ea9d9

Please sign in to comment.