diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 89aef6f0be1a6..11f00fbbb97ab 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; @@ -433,4 +434,103 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); + } + + public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + + int numShards = 2; + assertAcked( + prepareCreate( + INDEX_NAME, + 0, + Settings.builder() + .put("number_of_shards", numShards) + .put("number_of_replicas", 1) + .put("number_of_search_only_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + ) + ); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + ensureSearchable(INDEX_NAME); + + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) + .execute() + .actionGet(); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); + assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 3); + assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 3); + assertNotNull(currentReplicationState); + assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); + }, 1, TimeUnit.MINUTES); + } + + public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + + int numShards = 1; + assertAcked( + prepareCreate( + INDEX_NAME, + 0, + Settings.builder() + .put("number_of_shards", numShards) + .put("number_of_replicas", 0) + .put("number_of_search_only_replicas", 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + ) + ); + ensureGreen(); + final long numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index(INDEX_NAME, "doc", Integer.toString(i)); + } + refresh(INDEX_NAME); + ensureSearchable(INDEX_NAME); + + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats(INDEX_NAME) + .setDetailed(true) + .execute() + .actionGet(); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0); + final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats() + .stream() + .findFirst() + .get() + .getCurrentReplicationState(); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); + assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2); + assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2); + assertNotNull(currentReplicationState); + assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE); + assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0); + }, 1, TimeUnit.MINUTES); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java index ce17176a220ae..d5a05a60e8849 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/SegmentReplicationShardStatsResponse.java @@ -13,6 +13,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.indices.replication.SegmentReplicationState; import java.io.IOException; @@ -31,19 +32,31 @@ public class SegmentReplicationShardStatsResponse implements Writeable { @Nullable private final SegmentReplicationState replicaStats; + @Nullable + private final SegmentReplicationShardStats segmentReplicationShardStats; + public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException { this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new); this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new); + this.segmentReplicationShardStats = in.readOptionalWriteable(SegmentReplicationShardStats::new); } public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) { this.primaryStats = primaryStats; this.replicaStats = null; + this.segmentReplicationShardStats = null; } public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) { this.replicaStats = replicaStats; this.primaryStats = null; + this.segmentReplicationShardStats = null; + } + + public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) { + this.primaryStats = null; + this.replicaStats = null; + this.segmentReplicationShardStats = segmentReplicationShardStats; } public SegmentReplicationPerGroupStats getPrimaryStats() { @@ -54,10 +67,15 @@ public SegmentReplicationState getReplicaStats() { return replicaStats; } + public SegmentReplicationShardStats getSegmentReplicationShardStats() { + return segmentReplicationShardStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(primaryStats); out.writeOptionalWriteable(replicaStats); + out.writeOptionalWriteable(segmentReplicationShardStats); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index fc97d67c6c3af..b15d83de4b3b7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -26,9 +26,12 @@ import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -36,8 +39,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -101,6 +106,9 @@ protected SegmentReplicationStatsResponse newResponse( final Map replicaStats = new HashMap<>(); // map of index name to list of replication group stats. final Map> primaryStats = new HashMap<>(); + // search replica responses + final Set searchReplicaSegRepShardStats = new HashSet<>(); + for (SegmentReplicationShardStatsResponse response : responses) { if (response != null) { if (response.getReplicaStats() != null) { @@ -109,6 +117,11 @@ protected SegmentReplicationStatsResponse newResponse( replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats()); } } + + if (response.getSegmentReplicationShardStats() != null) { + searchReplicaSegRepShardStats.add(response.getSegmentReplicationShardStats()); + } + if (response.getPrimaryStats() != null) { final ShardId shardId = response.getPrimaryStats().getShardId(); if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) { @@ -134,6 +147,15 @@ protected SegmentReplicationStatsResponse newResponse( } } } + // combine the search replica stats with the stats of other replicas + for (Map.Entry> entry : primaryStats.entrySet()) { + for (SegmentReplicationPerGroupStats group : entry.getValue()) { + Set updatedSet = new HashSet<>(group.getReplicaStats()); + updatedSet.addAll(searchReplicaSegRepShardStats); + group.setReplicaStats(updatedSet); + } + } + return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures); } @@ -154,13 +176,17 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication if (shardRouting.primary()) { return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard)); + } else if (shardRouting.isSearchOnly()) { + SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats( + shardRouting, + indexShard, + shardId, + request.activeOnly() + ); + return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats); + } else { + return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly())); } - - // return information about only on-going segment replication events. - if (request.activeOnly()) { - return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId)); - } - return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId)); } @Override @@ -181,4 +207,75 @@ protected ClusterBlockException checkRequestBlock( ) { return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); } + + private SegmentReplicationShardStats calcualteSegmentReplicationShardStats( + ShardRouting shardRouting, + IndexShard indexShard, + ShardId shardId, + boolean isActiveOnly + ) { + ReplicationCheckpoint indexReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); + SegmentReplicationState segmentReplicationState = getSegmentReplicationState(shardId, isActiveOnly); + if (segmentReplicationState != null) { + ReplicationCheckpoint latestReplicationCheckpointReceived = segmentReplicationState.getLatestReplicationCheckpoint(); + + SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats( + shardRouting.allocationId().getId(), + calculateCheckpointsBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived), + calculateBytesBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived), + 0, + calculateCurrentReplicationLag(shardId), + getLastCompletedReplicationLag(shardId) + ); + + segmentReplicationShardStats.setCurrentReplicationState(segmentReplicationState); + return segmentReplicationShardStats; + } else { + return new SegmentReplicationShardStats(shardRouting.allocationId().getId(), 0, 0, 0, 0, 0); + } + } + + private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) { + if (isActiveOnly) { + return targetService.getOngoingEventSegmentReplicationState(shardId); + } else { + return targetService.getSegmentReplicationState(shardId); + } + } + + private long calculateCheckpointsBehind( + ReplicationCheckpoint indexReplicationCheckpoint, + ReplicationCheckpoint latestReplicationCheckpointReceived + ) { + if (latestReplicationCheckpointReceived != null) { + return latestReplicationCheckpointReceived.getSegmentInfosVersion() - indexReplicationCheckpoint.getSegmentInfosVersion(); + } + return 0; + } + + private long calculateBytesBehind( + ReplicationCheckpoint indexReplicationCheckpoint, + ReplicationCheckpoint latestReplicationCheckpointReceived + ) { + if (latestReplicationCheckpointReceived != null) { + Store.RecoveryDiff diff = Store.segmentReplicationDiff( + latestReplicationCheckpointReceived.getMetadataMap(), + indexReplicationCheckpoint.getMetadataMap() + ); + return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); + } + return 0; + } + + private long calculateCurrentReplicationLag(ShardId shardId) { + SegmentReplicationState ongoingEventSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId); + return ongoingEventSegmentReplicationState != null ? ongoingEventSegmentReplicationState.getTimer().time() : 0; + } + + private long getLastCompletedReplicationLag(ShardId shardId) { + SegmentReplicationState lastCompletedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState( + shardId + ); + return lastCompletedSegmentReplicationState != null ? lastCompletedSegmentReplicationState.getTimer().time() : 0; + } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index 884686ee48fa1..98d8bfc7d66d8 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -28,7 +28,7 @@ public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment { private final ShardId shardId; - private final Set replicaStats; + private Set replicaStats; private final long rejectedRequestCount; public SegmentReplicationPerGroupStats(ShardId shardId, Set replicaStats, long rejectedRequestCount) { @@ -55,6 +55,10 @@ public ShardId getShardId() { return shardId; } + public void setReplicaStats(Set replicaStats) { + this.replicaStats = replicaStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("rejected_requests", rejectedRequestCount); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java index e381ade253422..40fac40f3ce54 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java @@ -64,6 +64,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException { this.currentReplicationTimeMillis = in.readVLong(); this.lastCompletedReplicationTimeMillis = in.readVLong(); this.currentReplicationLagMillis = in.readVLong(); + this.currentReplicationState = in.readOptionalWriteable(SegmentReplicationState::new); } public String getAllocationId() { @@ -118,7 +119,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis)); builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis)); if (currentReplicationState != null) { - builder.startObject(); + builder.startObject("current_replication_state"); currentReplicationState.toXContent(builder, params); builder.endObject(); } @@ -134,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(currentReplicationTimeMillis); out.writeVLong(lastCompletedReplicationTimeMillis); out.writeVLong(currentReplicationLagMillis); + out.writeOptionalWriteable(currentReplicationState); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 5fa123948c5ac..9e712b981d30a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -18,6 +18,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTimer; @@ -88,6 +89,8 @@ public static Stage fromId(byte id) { private String sourceDescription; private DiscoveryNode targetNode; + private ReplicationCheckpoint latestReplicationCheckpoint; + public ShardRouting getShardRouting() { return shardRouting; } @@ -148,6 +151,10 @@ public TimeValue getFinalizeReplicationStageTime() { return new TimeValue(time); } + public ReplicationCheckpoint getLatestReplicationCheckpoint() { + return this.latestReplicationCheckpoint; + } + public SegmentReplicationState( ShardRouting shardRouting, ReplicationLuceneIndex index, @@ -252,6 +259,10 @@ public void setStage(Stage stage) { } } + public void setLatestReplicationCheckpoint(ReplicationCheckpoint latestReplicationCheckpoint) { + this.latestReplicationCheckpoint = latestReplicationCheckpoint; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7131b49a41834..bf86e316db8ec 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -177,6 +177,7 @@ public void startReplication(ActionListener listener) { source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); checkpointInfoListener.whenComplete(checkpointInfo -> { + state.setLatestReplicationCheckpoint(checkpointInfo.getCheckpoint()); final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel();