Skip to content

Commit

Permalink
PR feedback.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jul 11, 2023
1 parent f42be80 commit 50a1c37
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -159,13 +160,16 @@ protected void verifyStoreContent() throws Exception {
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final int primaryDocCount = getDocCountFromShard(primaryShard);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
final int replicaDocCount = getDocCountFromShard(replicaShard);
assertEquals("Doc counts should match", primaryDocCount, replicaDocCount);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
Expand All @@ -186,6 +190,12 @@ protected void verifyStoreContent() throws Exception {
}, 1, TimeUnit.MINUTES);
}

private int getDocCountFromShard(IndexShard shard) {
try (final Engine.Searcher searcher = shard.acquireSearcher("test")) {
return searcher.getDirectoryReader().numDocs();
}
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());

if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ public void getCheckpointMetadata(
ActionListener<CheckpointInfoResponse> listener
) {
// set the listener, we will only fail it once we cancel the source.
logger.info("In test source");
this.listener = listener;
latch.countDown();
// do not resolve this listener yet, wait for cancel to hit.
Expand Down

0 comments on commit 50a1c37

Please sign in to comment.