Skip to content

Commit

Permalink
Added implementation for the stats calculation for search and regular…
Browse files Browse the repository at this point in the history
… replica in shards

Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
  • Loading branch information
vinaykpud committed Nov 18, 2024
1 parent f105e4e commit 0acfe37
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -433,4 +434,103 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {

}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 2;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 3);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 3);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 1;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.SegmentReplicationState;

import java.io.IOException;
Expand All @@ -31,19 +32,31 @@ public class SegmentReplicationShardStatsResponse implements Writeable {
@Nullable
private final SegmentReplicationState replicaStats;

@Nullable
private final SegmentReplicationShardStats segmentReplicationShardStats;

public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException {
this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new);
this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new);
this.segmentReplicationShardStats = in.readOptionalWriteable(SegmentReplicationShardStats::new);
}

public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) {
this.primaryStats = primaryStats;
this.replicaStats = null;
this.segmentReplicationShardStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) {
this.replicaStats = replicaStats;
this.primaryStats = null;
this.segmentReplicationShardStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) {
this.primaryStats = null;
this.replicaStats = null;
this.segmentReplicationShardStats = segmentReplicationShardStats;
}

public SegmentReplicationPerGroupStats getPrimaryStats() {
Expand All @@ -54,10 +67,15 @@ public SegmentReplicationState getReplicaStats() {
return replicaStats;
}

public SegmentReplicationShardStats getSegmentReplicationShardStats() {
return segmentReplicationShardStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(primaryStats);
out.writeOptionalWriteable(replicaStats);
out.writeOptionalWriteable(segmentReplicationShardStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -101,6 +106,9 @@ protected SegmentReplicationStatsResponse newResponse(
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();
// search replica responses
final Set<SegmentReplicationShardStats> searchReplicaSegRepShardStats = new HashSet<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +117,11 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getSegmentReplicationShardStats() != null) {
searchReplicaSegRepShardStats.add(response.getSegmentReplicationShardStats());
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -134,6 +147,15 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the search replica stats with the stats of other replicas
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
Set<SegmentReplicationShardStats> updatedSet = new HashSet<>(group.getReplicaStats());
updatedSet.addAll(searchReplicaSegRepShardStats);
group.setReplicaStats(updatedSet);
}
}

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);
}

Expand All @@ -154,13 +176,17 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication

if (shardRouting.primary()) {
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
} else if (shardRouting.isSearchOnly()) {
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(
shardRouting,
indexShard,
shardId,
request.activeOnly()
);
return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats);
} else {
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
}

@Override
Expand All @@ -181,4 +207,75 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(
ShardRouting shardRouting,
IndexShard indexShard,
ShardId shardId,
boolean isActiveOnly
) {
ReplicationCheckpoint indexReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint();
SegmentReplicationState segmentReplicationState = getSegmentReplicationState(shardId, isActiveOnly);
if (segmentReplicationState != null) {
ReplicationCheckpoint latestReplicationCheckpointReceived = segmentReplicationState.getLatestReplicationCheckpoint();

SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
calculateBytesBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
0,
calculateCurrentReplicationLag(shardId),
getLastCompletedReplicationLag(shardId)
);

segmentReplicationShardStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationShardStats;
} else {
return new SegmentReplicationShardStats(shardRouting.allocationId().getId(), 0, 0, 0, 0, 0);
}
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateCheckpointsBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
return latestReplicationCheckpointReceived.getSegmentInfosVersion() - indexReplicationCheckpoint.getSegmentInfosVersion();
}
return 0;
}

private long calculateBytesBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpointReceived.getMetadataMap(),
indexReplicationCheckpoint.getMetadataMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
}
return 0;
}

private long calculateCurrentReplicationLag(ShardId shardId) {
SegmentReplicationState ongoingEventSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);
return ongoingEventSegmentReplicationState != null ? ongoingEventSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(ShardId shardId) {
SegmentReplicationState lastCompletedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(
shardId
);
return lastCompletedSegmentReplicationState != null ? lastCompletedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment {

private final ShardId shardId;
private final Set<SegmentReplicationShardStats> replicaStats;
private Set<SegmentReplicationShardStats> replicaStats;
private final long rejectedRequestCount;

public SegmentReplicationPerGroupStats(ShardId shardId, Set<SegmentReplicationShardStats> replicaStats, long rejectedRequestCount) {
Expand All @@ -55,6 +55,10 @@ public ShardId getShardId() {
return shardId;
}

public void setReplicaStats(Set<SegmentReplicationShardStats> replicaStats) {
this.replicaStats = replicaStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("rejected_requests", rejectedRequestCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException {
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
this.currentReplicationState = in.readOptionalWriteable(SegmentReplicationState::new);
}

public String getAllocationId() {
Expand Down Expand Up @@ -118,7 +119,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
builder.startObject("current_replication_state");
currentReplicationState.toXContent(builder, params);
builder.endObject();
}
Expand All @@ -134,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
out.writeOptionalWriteable(currentReplicationState);
}

@Override
Expand Down
Loading

0 comments on commit 0acfe37

Please sign in to comment.