From 6944ce6a01bac67238776c71ef8968b78a5464d8 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 10 Dec 2024 14:27:23 -0800 Subject: [PATCH] Fixed PR comments and moved the integration tests to separate module Signed-off-by: Vinay Krishna Pudyodu --- .../SearchReplicaReplicationIT.java | 97 +++++++ .../SegmentReplicationStatsIT.java | 94 ------ .../SegmentReplicationShardStatsResponse.java | 27 +- ...ransportSegmentReplicationStatsAction.java | 88 +++--- .../SegmentReplicationPerGroupStats.java | 11 +- .../index/seqno/ReplicationTracker.java | 2 +- ...ortSegmentReplicationStatsActionTests.java | 273 +++++++++--------- 7 files changed, 290 insertions(+), 302 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java index a1b512c326ac5..a3a8e7d1db0ae 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java @@ -8,14 +8,20 @@ package org.opensearch.indices.replication; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.After; import org.junit.Before; import java.nio.file.Path; +import java.util.List; +import java.util.Set; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { @@ -82,4 +88,95 @@ public void testReplication() throws Exception { waitForSearchableDocs(docCount, primary, replica); } + public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List nodes = internalCluster().startDataOnlyNodes(3); + int numOfPrimaryShards = 2; + createIndex( + INDEX_NAME, + Settings.builder() + .put("number_of_shards", numOfPrimaryShards) + .put("number_of_replicas", 1) + .put("number_of_search_only_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen(INDEX_NAME); + + final int docCount = 5; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, nodes); + + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) + .execute() + .actionGet(); + + // Verify the number of indices + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); + // Verify total shards + assertEquals(numOfPrimaryShards * 3, segmentReplicationStatsResponse.getTotalShards()); + // Verify the number of primary shards + assertEquals(numOfPrimaryShards, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size()); + + List segmentReplicationPerGroupStats = segmentReplicationStatsResponse.getReplicationStats() + .get(INDEX_NAME); + for (SegmentReplicationPerGroupStats perGroupStats : segmentReplicationPerGroupStats) { + Set replicaStats = perGroupStats.getReplicaStats(); + // Verify the number of replica stats + assertEquals(2, replicaStats.size()); + for (SegmentReplicationShardStats replicaStat : replicaStats) { + assertNotNull(replicaStat.getCurrentReplicationState()); + } + } + } + + public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final List nodes = internalCluster().startDataOnlyNodes(2); + createIndex( + INDEX_NAME, + Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .put("number_of_search_only_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen(INDEX_NAME); + + final int docCount = 5; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, nodes); + + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) + .execute() + .actionGet(); + + // Verify the number of indices + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); + // Verify total shards + assertEquals(2, segmentReplicationStatsResponse.getTotalShards()); + // Verify the number of primary shards + assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size()); + + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + Set replicaStats = perGroupStats.getReplicaStats(); + // Verify the number of replica stats + assertEquals(1, replicaStats.size()); + for (SegmentReplicationShardStats replicaStat : replicaStats) { + assertNotNull(replicaStat.getCurrentReplicationState()); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 14be9b1cb3980..b2161577f7040 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -16,7 +16,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; @@ -433,97 +432,4 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { } } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); - } - - public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(3); - - int numShards = 2; - assertAcked( - prepareCreate( - INDEX_NAME, - 0, - Settings.builder() - .put("number_of_shards", numShards) - .put("number_of_replicas", 1) - .put("number_of_search_only_replicas", 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - ) - ); - ensureGreen(); - final long numDocs = scaledRandomIntBetween(50, 100); - for (int i = 0; i < numDocs; i++) { - index(INDEX_NAME, "doc", Integer.toString(i)); - } - refresh(INDEX_NAME); - ensureSearchable(INDEX_NAME); - - assertBusy(() -> { - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats(INDEX_NAME) - .setDetailed(true) - .execute() - .actionGet(); - assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size()); - assertEquals(numShards * 3, segmentReplicationStatsResponse.getTotalShards()); - assertEquals(numShards * 3, segmentReplicationStatsResponse.getSuccessfulShards()); - - SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); - Set replicaStats = perGroupStats.getReplicaStats(); - for (SegmentReplicationShardStats replica : replicaStats) { - assertNotNull(replica.getCurrentReplicationState()); - } - assertEquals(3, replicaStats.size()); - }, 1, TimeUnit.MINUTES); - } - - public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNodes(2); - - int numShards = 1; - assertAcked( - prepareCreate( - INDEX_NAME, - 0, - Settings.builder() - .put("number_of_shards", numShards) - .put("number_of_replicas", 0) - .put("number_of_search_only_replicas", 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - ) - ); - ensureGreen(); - final long numDocs = scaledRandomIntBetween(50, 100); - for (int i = 0; i < numDocs; i++) { - index(INDEX_NAME, "doc", Integer.toString(i)); - } - refresh(INDEX_NAME); - ensureSearchable(INDEX_NAME); - - assertBusy(() -> { - SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() - .indices() - .prepareSegmentReplicationStats(INDEX_NAME) - .setDetailed(true) - .execute() - .actionGet(); - assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); - assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2); - assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2); - - SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); - Set replicaStats = perGroupStats.getReplicaStats(); - // for (SegmentReplicationShardStats replica : replicaStats) { - // assertNotNull(replica.getCurrentReplicationState()); - // } - // assertEquals(1, replicaStats.size()); - }, 1, TimeUnit.MINUTES); - } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java index cf51431731cdf..ce17176a220ae 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java @@ -13,7 +13,6 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.index.SegmentReplicationPerGroupStats; -import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.indices.replication.SegmentReplicationState; import java.io.IOException; @@ -32,31 +31,19 @@ public class SegmentReplicationShardStatsResponse implements Writeable { @Nullable private final SegmentReplicationState replicaStats; - @Nullable - private final SegmentReplicationShardStats searchReplicaReplicationStats; - public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException { this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new); this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new); - this.searchReplicaReplicationStats = in.readOptionalWriteable(SegmentReplicationShardStats::new); } public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) { this.primaryStats = primaryStats; this.replicaStats = null; - this.searchReplicaReplicationStats = null; } public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) { this.replicaStats = replicaStats; this.primaryStats = null; - this.searchReplicaReplicationStats = null; - } - - public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) { - this.primaryStats = null; - this.replicaStats = null; - this.searchReplicaReplicationStats = segmentReplicationShardStats; } public SegmentReplicationPerGroupStats getPrimaryStats() { @@ -67,26 +54,14 @@ public SegmentReplicationState getReplicaStats() { return replicaStats; } - public SegmentReplicationShardStats getSearchReplicaReplicationStats() { - return searchReplicaReplicationStats; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(primaryStats); out.writeOptionalWriteable(replicaStats); - out.writeOptionalWriteable(searchReplicaReplicationStats); } @Override public String toString() { - return "SegmentReplicationShardStatsResponse{" - + "primaryStats=" - + primaryStats - + ", replicaStats=" - + replicaStats - + ", searchReplicaReplicationStats=" - + searchReplicaReplicationStats - + '}'; + return "SegmentReplicationShardStatsResponse{" + "primaryStats=" + primaryStats + ", replicaStats=" + replicaStats + '}'; } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index 218d32098eae5..1a4f073941494 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -35,11 +35,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Transport action for shard segment replication operation. This transport action does not actually @@ -97,13 +97,10 @@ protected SegmentReplicationStatsResponse newResponse( ) { String[] shards = request.shards(); final List shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList()); - // organize replica responses by allocationId. final Map replicaStats = new HashMap<>(); // map of index name to list of replication group stats. final Map> primaryStats = new HashMap<>(); - // search replica responses - final Set searchReplicaSegRepShardStats = new HashSet<>(); for (SegmentReplicationShardStatsResponse response : responses) { if (response != null) { @@ -114,10 +111,6 @@ protected SegmentReplicationStatsResponse newResponse( } } - if (response.getSearchReplicaReplicationStats() != null) { - searchReplicaSegRepShardStats.add(response.getSearchReplicaReplicationStats()); - } - if (response.getPrimaryStats() != null) { final ShardId shardId = response.getPrimaryStats().getShardId(); if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) { @@ -135,22 +128,20 @@ protected SegmentReplicationStatsResponse newResponse( } } } - // combine the replica stats to the shard stat entry in each group. - for (Map.Entry> entry : primaryStats.entrySet()) { - for (SegmentReplicationPerGroupStats group : entry.getValue()) { - for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) { - replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)); - } - } - } - // combine the search replica stats with the stats of other replicas - for (Map.Entry> entry : primaryStats.entrySet()) { - for (SegmentReplicationPerGroupStats group : entry.getValue()) { - group.addReplicaStats(searchReplicaSegRepShardStats); - } - } - return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures); + Map> replicationStats = primaryStats.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue() + .stream() + .map(groupStats -> updateGroupStats(groupStats, replicaStats)) + .collect(Collectors.toList()) + ) + ); + + return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, replicationStats, shardFailures); } @Override @@ -169,12 +160,9 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication if (shardRouting.primary()) { return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard)); - } else if (shardRouting.isSearchOnly()) { - SegmentReplicationShardStats segmentReplicationShardStats = calculateSegmentReplicationShardStats(shardRouting); - return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats); - } else { - return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly())); } + + return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly())); } @Override @@ -196,12 +184,49 @@ protected ClusterBlockException checkRequestBlock( return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); } - private SegmentReplicationShardStats calculateSegmentReplicationShardStats(ShardRouting shardRouting) { + private SegmentReplicationPerGroupStats updateGroupStats( + SegmentReplicationPerGroupStats groupStats, + Map replicaStats + ) { + // Update the SegmentReplicationState for each of the replicas + Set updatedReplicaStats = groupStats.getReplicaStats() + .stream() + .peek(replicaStat -> replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null))) + .collect(Collectors.toSet()); + + // Compute search replica stats + Set searchReplicaStats = computeSearchReplicaStats(groupStats.getShardId(), replicaStats); + + // Combine ReplicaStats and SearchReplicaStats + Set combinedStats = Stream.concat(updatedReplicaStats.stream(), searchReplicaStats.stream()) + .collect(Collectors.toSet()); + + return new SegmentReplicationPerGroupStats(groupStats.getShardId(), combinedStats, 0); + } + + private Set computeSearchReplicaStats( + ShardId shardId, + Map replicaStats + ) { + return replicaStats.values() + .stream() + .filter(segmentReplicationState -> segmentReplicationState.getShardRouting().shardId().equals(shardId)) + .filter(segmentReplicationState -> segmentReplicationState.getShardRouting().isSearchOnly()) + .map(segmentReplicationState -> { + ShardRouting shardRouting = segmentReplicationState.getShardRouting(); + SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats(shardRouting); + segmentReplicationStats.setCurrentReplicationState(segmentReplicationState); + return segmentReplicationStats; + }) + .collect(Collectors.toSet()); + } + + SegmentReplicationShardStats computeSegmentReplicationShardStats(ShardRouting shardRouting) { ShardId shardId = shardRouting.shardId(); SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId); SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId); - SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats( + return new SegmentReplicationShardStats( shardRouting.allocationId().getId(), 0, calculateBytesRemainingToReplicate(ongoingSegmentReplicationState), @@ -209,9 +234,6 @@ private SegmentReplicationShardStats calculateSegmentReplicationShardStats(Shard getCurrentReplicationLag(ongoingSegmentReplicationState), getLastCompletedReplicationLag(completedSegmentReplicationState) ); - - segmentReplicationShardStats.setCurrentReplicationState(targetService.getSegmentReplicationState(shardId)); - return segmentReplicationShardStats; } private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) { diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index 6b12211bfaa09..884686ee48fa1 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -28,7 +28,7 @@ public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment { private final ShardId shardId; - private Set replicaStats; + private final Set replicaStats; private final long rejectedRequestCount; public SegmentReplicationPerGroupStats(ShardId shardId, Set replicaStats, long rejectedRequestCount) { @@ -55,15 +55,6 @@ public ShardId getShardId() { return shardId; } - public void addReplicaStats(Set replicaStats) { - if (this.replicaStats.isEmpty()) { - // When there is only search replica, replicaStats is empty. EmptySet is immutable and doesn't support adding item - this.replicaStats = replicaStats; - } else { - this.replicaStats.addAll(replicaStats); - } - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("rejected_requests", rejectedRequestCount); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index abb52ff1a5169..1e43827afeb47 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1345,7 +1345,7 @@ && shouldSkipReplicationTimer(entry.getKey()) == false && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(entry.getKey()).currentNodeId()))) ) .map(entry -> buildShardStats(entry.getKey(), entry.getValue())) - .collect(Collectors.toSet()); + .collect(Collectors.toUnmodifiableSet()); } return Collections.emptySet(); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsActionTests.java index 6d4527a9007c3..ea455d607f058 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsActionTests.java @@ -142,7 +142,6 @@ public void testShardOperationWithReplicaShard() { assertNotNull(response); assertNull(response.getPrimaryStats()); assertNotNull(response.getReplicaStats()); - assertNull(response.getSearchReplicaReplicationStats()); verify(targetService).getSegmentReplicationState(shardId); } @@ -165,133 +164,112 @@ public void testShardOperationWithReplicaShardActiveOnly() { assertNotNull(response); assertNull(response.getPrimaryStats()); assertNotNull(response.getReplicaStats()); - assertNull(response.getSearchReplicaReplicationStats()); verify(targetService).getOngoingEventSegmentReplicationState(shardId); } - public void testShardOperationWithSearchOnlyReplicaWhenCompletedAndOngoingStateNotNull() { + public void testComputeBytesRemainingToReplicateWhenCompletedAndOngoingStateNotNull() { ShardRouting shardRouting = mock(ShardRouting.class); SegmentReplicationState completedSegmentReplicationState = mock(SegmentReplicationState.class); SegmentReplicationState onGoingSegmentReplicationState = mock(SegmentReplicationState.class); ShardId shardId = new ShardId(new Index("test-index", "test-uuid"), 0); AllocationId allocationId = AllocationId.newInitializing(); - SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); ReplicationTimer replicationTimerCompleted = mock(ReplicationTimer.class); ReplicationTimer replicationTimerOngoing = mock(ReplicationTimer.class); long time1 = 10; long time2 = 15; ReplicationLuceneIndex replicationLuceneIndex = new ReplicationLuceneIndex(); replicationLuceneIndex.addFileDetail("name1", 10, false); - replicationLuceneIndex.addFileDetail("name2", 5, false); + replicationLuceneIndex.addFileDetail("name2", 15, false); when(shardRouting.shardId()).thenReturn(shardId); - when(shardRouting.primary()).thenReturn(false); - when(shardRouting.isSearchOnly()).thenReturn(true); when(shardRouting.allocationId()).thenReturn(allocationId); - when(indicesService.indexServiceSafe(shardId.getIndex())).thenReturn(indexService); - when(indexService.getShard(shardId.id())).thenReturn(indexShard); - when(indexShard.indexSettings()).thenReturn(createIndexSettingsWithSegRepEnabled()); when(targetService.getlatestCompletedEventSegmentReplicationState(shardId)).thenReturn(completedSegmentReplicationState); when(targetService.getOngoingEventSegmentReplicationState(shardId)).thenReturn(onGoingSegmentReplicationState); - when(targetService.getSegmentReplicationState(shardId)).thenReturn(onGoingSegmentReplicationState); when(completedSegmentReplicationState.getTimer()).thenReturn(replicationTimerCompleted); when(onGoingSegmentReplicationState.getTimer()).thenReturn(replicationTimerOngoing); when(replicationTimerOngoing.time()).thenReturn(time1); when(replicationTimerCompleted.time()).thenReturn(time2); when(onGoingSegmentReplicationState.getIndex()).thenReturn(replicationLuceneIndex); - SegmentReplicationShardStatsResponse response = action.shardOperation(request, shardRouting); + SegmentReplicationShardStats segmentReplicationShardStats = action.computeSegmentReplicationShardStats(shardRouting); + + assertNotNull(segmentReplicationShardStats); + assertEquals(25, segmentReplicationShardStats.getBytesBehindCount()); + assertEquals(10, segmentReplicationShardStats.getCurrentReplicationLagMillis()); + assertEquals(15, segmentReplicationShardStats.getLastCompletedReplicationTimeMillis()); - assertNotNull(response); verify(targetService).getlatestCompletedEventSegmentReplicationState(shardId); verify(targetService).getOngoingEventSegmentReplicationState(shardId); } - public void testShardOperationWithSearchOnlyReplicaWhenNoCompletedState() { + public void testCalculateBytesRemainingToReplicateWhenNoCompletedState() { ShardRouting shardRouting = mock(ShardRouting.class); SegmentReplicationState onGoingSegmentReplicationState = mock(SegmentReplicationState.class); ShardId shardId = new ShardId(new Index("test-index", "test-uuid"), 0); AllocationId allocationId = AllocationId.newInitializing(); - SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); ReplicationTimer replicationTimerOngoing = mock(ReplicationTimer.class); long time1 = 10; ReplicationLuceneIndex replicationLuceneIndex = new ReplicationLuceneIndex(); replicationLuceneIndex.addFileDetail("name1", 10, false); - replicationLuceneIndex.addFileDetail("name2", 5, false); + replicationLuceneIndex.addFileDetail("name2", 15, false); when(shardRouting.shardId()).thenReturn(shardId); - when(shardRouting.primary()).thenReturn(false); - when(shardRouting.isSearchOnly()).thenReturn(true); when(shardRouting.allocationId()).thenReturn(allocationId); - when(indicesService.indexServiceSafe(shardId.getIndex())).thenReturn(indexService); - when(indexService.getShard(shardId.id())).thenReturn(indexShard); - when(indexShard.indexSettings()).thenReturn(createIndexSettingsWithSegRepEnabled()); when(targetService.getOngoingEventSegmentReplicationState(shardId)).thenReturn(onGoingSegmentReplicationState); - when(targetService.getSegmentReplicationState(shardId)).thenReturn(onGoingSegmentReplicationState); when(onGoingSegmentReplicationState.getTimer()).thenReturn(replicationTimerOngoing); when(replicationTimerOngoing.time()).thenReturn(time1); when(onGoingSegmentReplicationState.getIndex()).thenReturn(replicationLuceneIndex); - SegmentReplicationShardStatsResponse response = action.shardOperation(request, shardRouting); + SegmentReplicationShardStats segmentReplicationShardStats = action.computeSegmentReplicationShardStats(shardRouting); + + assertNotNull(segmentReplicationShardStats); + assertEquals(25, segmentReplicationShardStats.getBytesBehindCount()); + assertEquals(10, segmentReplicationShardStats.getCurrentReplicationLagMillis()); + assertEquals(0, segmentReplicationShardStats.getLastCompletedReplicationTimeMillis()); - assertNotNull(response); - assertNull(response.getPrimaryStats()); - assertNull(response.getReplicaStats()); - assertNotNull(response.getSearchReplicaReplicationStats()); verify(targetService).getlatestCompletedEventSegmentReplicationState(shardId); verify(targetService).getOngoingEventSegmentReplicationState(shardId); } - public void testShardOperationWithSearchOnlyReplicaWhenNoOngoingState() { + public void testCalculateBytesRemainingToReplicateWhenNoOnGoingState() { ShardRouting shardRouting = mock(ShardRouting.class); SegmentReplicationState completedSegmentReplicationState = mock(SegmentReplicationState.class); ShardId shardId = new ShardId(new Index("test-index", "test-uuid"), 0); AllocationId allocationId = AllocationId.newInitializing(); - SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); ReplicationTimer replicationTimerCompleted = mock(ReplicationTimer.class); long time2 = 15; when(shardRouting.shardId()).thenReturn(shardId); - when(shardRouting.primary()).thenReturn(false); - when(shardRouting.isSearchOnly()).thenReturn(true); when(shardRouting.allocationId()).thenReturn(allocationId); - when(indicesService.indexServiceSafe(shardId.getIndex())).thenReturn(indexService); - when(indexService.getShard(shardId.id())).thenReturn(indexShard); - when(indexShard.indexSettings()).thenReturn(createIndexSettingsWithSegRepEnabled()); when(targetService.getlatestCompletedEventSegmentReplicationState(shardId)).thenReturn(completedSegmentReplicationState); when(completedSegmentReplicationState.getTimer()).thenReturn(replicationTimerCompleted); when(replicationTimerCompleted.time()).thenReturn(time2); - SegmentReplicationShardStatsResponse response = action.shardOperation(request, shardRouting); + SegmentReplicationShardStats segmentReplicationShardStats = action.computeSegmentReplicationShardStats(shardRouting); + + assertNotNull(segmentReplicationShardStats); + assertEquals(0, segmentReplicationShardStats.getBytesBehindCount()); + assertEquals(0, segmentReplicationShardStats.getCurrentReplicationLagMillis()); + assertEquals(15, segmentReplicationShardStats.getLastCompletedReplicationTimeMillis()); - assertNotNull(response); - assertNull(response.getPrimaryStats()); - assertNull(response.getReplicaStats()); - assertNotNull(response.getSearchReplicaReplicationStats()); verify(targetService).getlatestCompletedEventSegmentReplicationState(shardId); verify(targetService).getOngoingEventSegmentReplicationState(shardId); } - public void testShardOperationWithSearchOnlyReplicaWhenNoCompletedAndOngoingState() { + public void testCalculateBytesRemainingToReplicateWhenNoCompletedAndOngoingState() { ShardRouting shardRouting = mock(ShardRouting.class); ShardId shardId = new ShardId(new Index("test-index", "test-uuid"), 0); AllocationId allocationId = AllocationId.newInitializing(); - SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); - when(shardRouting.shardId()).thenReturn(shardId); - when(shardRouting.primary()).thenReturn(false); - when(shardRouting.isSearchOnly()).thenReturn(true); when(shardRouting.allocationId()).thenReturn(allocationId); - when(indicesService.indexServiceSafe(shardId.getIndex())).thenReturn(indexService); - when(indexService.getShard(shardId.id())).thenReturn(indexShard); - when(indexShard.indexSettings()).thenReturn(createIndexSettingsWithSegRepEnabled()); - SegmentReplicationShardStatsResponse response = action.shardOperation(request, shardRouting); + SegmentReplicationShardStats segmentReplicationShardStats = action.computeSegmentReplicationShardStats(shardRouting); + + assertNotNull(segmentReplicationShardStats); + assertEquals(0, segmentReplicationShardStats.getBytesBehindCount()); + assertEquals(0, segmentReplicationShardStats.getCurrentReplicationLagMillis()); + assertEquals(0, segmentReplicationShardStats.getLastCompletedReplicationTimeMillis()); - assertNotNull(response); - assertNull(response.getPrimaryStats()); - assertNull(response.getReplicaStats()); - assertNotNull(response.getSearchReplicaReplicationStats()); verify(targetService).getlatestCompletedEventSegmentReplicationState(shardId); verify(targetService).getOngoingEventSegmentReplicationState(shardId); } @@ -299,49 +277,105 @@ public void testShardOperationWithSearchOnlyReplicaWhenNoCompletedAndOngoingStat public void testNewResponseWhenAllReplicasReturnResponseCombinesTheResults() { SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); List shardFailures = new ArrayList<>(); - String[] shards = { "1", "2", "3" }; + String[] shards = { "0", "1" }; request.shards(shards); - int totalShards = 3; - int successfulShards = 3; + int totalShards = 6; + int successfulShards = 6; int failedShard = 0; String allocIdOne = "allocIdOne"; String allocIdTwo = "allocIdTwo"; - ShardId shardIdOne = mock(ShardId.class); - ShardId shardIdTwo = mock(ShardId.class); - ShardId shardIdThree = mock(ShardId.class); - ShardRouting shardRoutingOne = mock(ShardRouting.class); - ShardRouting shardRoutingTwo = mock(ShardRouting.class); - ShardRouting shardRoutingThree = mock(ShardRouting.class); - when(shardIdOne.getId()).thenReturn(1); - when(shardIdTwo.getId()).thenReturn(2); - when(shardIdThree.getId()).thenReturn(3); - when(shardRoutingOne.shardId()).thenReturn(shardIdOne); - when(shardRoutingTwo.shardId()).thenReturn(shardIdTwo); - when(shardRoutingThree.shardId()).thenReturn(shardIdThree); - AllocationId allocationId = mock(AllocationId.class); - when(allocationId.getId()).thenReturn(allocIdOne); - when(shardRoutingTwo.allocationId()).thenReturn(allocationId); - when(shardIdOne.getIndexName()).thenReturn("test-index"); + String allocIdThree = "allocIdThree"; + String allocIdFour = "allocIdFour"; + String allocIdFive = "allocIdFive"; + String allocIdSix = "allocIdSix"; + + ShardId shardId0 = mock(ShardId.class); + ShardRouting primary0 = mock(ShardRouting.class); + ShardRouting replica0 = mock(ShardRouting.class); + ShardRouting searchReplica0 = mock(ShardRouting.class); + + ShardId shardId1 = mock(ShardId.class); + ShardRouting primary1 = mock(ShardRouting.class); + ShardRouting replica1 = mock(ShardRouting.class); + ShardRouting searchReplica1 = mock(ShardRouting.class); + + when(shardId0.getId()).thenReturn(0); + when(shardId0.getIndexName()).thenReturn("test-index-1"); + when(primary0.shardId()).thenReturn(shardId0); + when(replica0.shardId()).thenReturn(shardId0); + when(searchReplica0.shardId()).thenReturn(shardId0); + + when(shardId1.getId()).thenReturn(1); + when(shardId1.getIndexName()).thenReturn("test-index-1"); + when(primary1.shardId()).thenReturn(shardId1); + when(replica1.shardId()).thenReturn(shardId1); + when(searchReplica1.shardId()).thenReturn(shardId1); + + AllocationId allocationIdOne = mock(AllocationId.class); + AllocationId allocationIdTwo = mock(AllocationId.class); + AllocationId allocationIdThree = mock(AllocationId.class); + AllocationId allocationIdFour = mock(AllocationId.class); + AllocationId allocationIdFive = mock(AllocationId.class); + AllocationId allocationIdSix = mock(AllocationId.class); + + when(allocationIdOne.getId()).thenReturn(allocIdOne); + when(allocationIdTwo.getId()).thenReturn(allocIdTwo); + when(allocationIdThree.getId()).thenReturn(allocIdThree); + when(allocationIdFour.getId()).thenReturn(allocIdFour); + when(allocationIdFive.getId()).thenReturn(allocIdFive); + when(allocationIdSix.getId()).thenReturn(allocIdSix); + when(primary0.allocationId()).thenReturn(allocationIdOne); + when(replica0.allocationId()).thenReturn(allocationIdTwo); + when(searchReplica0.allocationId()).thenReturn(allocationIdThree); + when(primary1.allocationId()).thenReturn(allocationIdFour); + when(replica1.allocationId()).thenReturn(allocationIdFive); + when(searchReplica1.allocationId()).thenReturn(allocationIdSix); + + when(primary0.isSearchOnly()).thenReturn(false); + when(replica0.isSearchOnly()).thenReturn(false); + when(searchReplica0.isSearchOnly()).thenReturn(true); + when(primary1.isSearchOnly()).thenReturn(false); + when(replica1.isSearchOnly()).thenReturn(false); + when(searchReplica1.isSearchOnly()).thenReturn(true); + + Set segmentReplicationShardStats0 = new HashSet<>(); + SegmentReplicationShardStats segmentReplicationShardStatsOfReplica0 = new SegmentReplicationShardStats(allocIdTwo, 0, 0, 0, 0, 0); + segmentReplicationShardStats0.add(segmentReplicationShardStatsOfReplica0); + + Set segmentReplicationShardStats1 = new HashSet<>(); + SegmentReplicationShardStats segmentReplicationShardStatsOfReplica1 = new SegmentReplicationShardStats(allocIdFive, 0, 0, 0, 0, 0); + segmentReplicationShardStats1.add(segmentReplicationShardStatsOfReplica1); + + SegmentReplicationPerGroupStats segmentReplicationPerGroupStats0 = new SegmentReplicationPerGroupStats( + shardId0, + segmentReplicationShardStats0, + 0 + ); - Set segmentReplicationShardStats = new HashSet<>(); - SegmentReplicationShardStats segmentReplicationShardStatsOfReplica = new SegmentReplicationShardStats(allocIdOne, 0, 0, 0, 0, 0); - segmentReplicationShardStats.add(segmentReplicationShardStatsOfReplica); - SegmentReplicationPerGroupStats segmentReplicationPerGroupStats = new SegmentReplicationPerGroupStats( - shardIdOne, - segmentReplicationShardStats, + SegmentReplicationPerGroupStats segmentReplicationPerGroupStats1 = new SegmentReplicationPerGroupStats( + shardId1, + segmentReplicationShardStats1, 0 ); - SegmentReplicationState segmentReplicationState = mock(SegmentReplicationState.class); - SegmentReplicationShardStats segmentReplicationShardStatsFromSearchReplica = mock(SegmentReplicationShardStats.class); - when(segmentReplicationShardStatsFromSearchReplica.getAllocationId()).thenReturn("alloc2"); - when(segmentReplicationState.getShardRouting()).thenReturn(shardRoutingTwo); + SegmentReplicationState segmentReplicationState0 = mock(SegmentReplicationState.class); + SegmentReplicationState searchReplicaSegmentReplicationState0 = mock(SegmentReplicationState.class); + SegmentReplicationState segmentReplicationState1 = mock(SegmentReplicationState.class); + SegmentReplicationState searchReplicaSegmentReplicationState1 = mock(SegmentReplicationState.class); + + when(segmentReplicationState0.getShardRouting()).thenReturn(replica0); + when(searchReplicaSegmentReplicationState0.getShardRouting()).thenReturn(searchReplica0); + when(segmentReplicationState1.getShardRouting()).thenReturn(replica1); + when(searchReplicaSegmentReplicationState1.getShardRouting()).thenReturn(searchReplica1); List responses = List.of( - new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStats), - new SegmentReplicationShardStatsResponse(segmentReplicationState), - new SegmentReplicationShardStatsResponse(segmentReplicationShardStatsFromSearchReplica) + new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStats0), + new SegmentReplicationShardStatsResponse(segmentReplicationState0), + new SegmentReplicationShardStatsResponse(searchReplicaSegmentReplicationState0), + new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStats1), + new SegmentReplicationShardStatsResponse(segmentReplicationState1), + new SegmentReplicationShardStatsResponse(searchReplicaSegmentReplicationState1) ); SegmentReplicationStatsResponse response = action.newResponse( @@ -354,66 +388,30 @@ public void testNewResponseWhenAllReplicasReturnResponseCombinesTheResults() { ClusterState.EMPTY_STATE ); - List responseStats = response.getReplicationStats().get("test-index"); - SegmentReplicationPerGroupStats primStats = responseStats.get(0); - Set segRpShardStatsSet = primStats.getReplicaStats(); - - for (SegmentReplicationShardStats segRpShardStats : segRpShardStatsSet) { - if (segRpShardStats.getAllocationId().equals(allocIdOne)) { - assertEquals(segmentReplicationState, segRpShardStats.getCurrentReplicationState()); + List responseStats = response.getReplicationStats().get("test-index-1"); + SegmentReplicationPerGroupStats primStats0 = responseStats.get(0); + Set replicaStats0 = primStats0.getReplicaStats(); + assertEquals(2, replicaStats0.size()); + for (SegmentReplicationShardStats replicaStat : replicaStats0) { + if (replicaStat.getAllocationId().equals(allocIdTwo)) { + assertEquals(segmentReplicationState0, replicaStat.getCurrentReplicationState()); } - if (segRpShardStats.getAllocationId().equals(allocIdTwo)) { - assertEquals(segmentReplicationShardStatsFromSearchReplica, segRpShardStats); + if (replicaStat.getAllocationId().equals(allocIdThree)) { + assertEquals(searchReplicaSegmentReplicationState0, replicaStat.getCurrentReplicationState()); } } - } - - public void testNewResponseWhenTwoPrimaryShardsForSameIndex() { - SegmentReplicationStatsRequest request = new SegmentReplicationStatsRequest(); - List shardFailures = new ArrayList<>(); - String[] shards = { "1", "2" }; - request.shards(shards); - int totalShards = 3; - int successfulShards = 3; - int failedShard = 0; - - SegmentReplicationPerGroupStats segmentReplicationPerGroupStatsOne = mock(SegmentReplicationPerGroupStats.class); - SegmentReplicationPerGroupStats segmentReplicationPerGroupStatsTwo = mock(SegmentReplicationPerGroupStats.class); - - ShardId shardIdOne = mock(ShardId.class); - ShardId shardIdTwo = mock(ShardId.class); - when(segmentReplicationPerGroupStatsOne.getShardId()).thenReturn(shardIdOne); - when(segmentReplicationPerGroupStatsTwo.getShardId()).thenReturn(shardIdTwo); - when(shardIdOne.getIndexName()).thenReturn("test-index"); - when(shardIdTwo.getIndexName()).thenReturn("test-index"); - when(shardIdOne.getId()).thenReturn(1); - when(shardIdTwo.getId()).thenReturn(2); - - List responses = List.of( - new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStatsOne), - new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStatsTwo) - ); - - SegmentReplicationStatsResponse response = action.newResponse( - request, - totalShards, - successfulShards, - failedShard, - responses, - shardFailures, - ClusterState.EMPTY_STATE - ); - - List responseStats = response.getReplicationStats().get("test-index"); - for (SegmentReplicationPerGroupStats primStat : responseStats) { - if (primStat.getShardId().equals(shardIdOne)) { - assertEquals(segmentReplicationPerGroupStatsOne, primStat); + SegmentReplicationPerGroupStats primStats1 = responseStats.get(1); + Set replicaStats1 = primStats1.getReplicaStats(); + assertEquals(2, replicaStats1.size()); + for (SegmentReplicationShardStats replicaStat : replicaStats1) { + if (replicaStat.getAllocationId().equals(allocIdFive)) { + assertEquals(segmentReplicationState1, replicaStat.getCurrentReplicationState()); } - if (primStat.getShardId().equals(shardIdTwo)) { - assertEquals(segmentReplicationPerGroupStatsTwo, primStat); + if (replicaStat.getAllocationId().equals(allocIdSix)) { + assertEquals(searchReplicaSegmentReplicationState1, replicaStat.getCurrentReplicationState()); } } } @@ -464,7 +462,6 @@ public void testNewResponseWhenShardsToFetchEmptyAndResponsesContainsNull() { responses.add(null); responses.add(new SegmentReplicationShardStatsResponse(segmentReplicationPerGroupStats)); responses.add(new SegmentReplicationShardStatsResponse(segmentReplicationState)); - responses.add(new SegmentReplicationShardStatsResponse(segmentReplicationShardStatsFromSearchReplica)); SegmentReplicationStatsResponse response = action.newResponse( request,