Skip to content

Commit

Permalink
Remove primary/replica separation.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Sep 21, 2023
1 parent a69ec0f commit e92aafe
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,41 +402,28 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
// primary node - should hold replication statistics
final ReplicationStats.ShardReplicationStats primaryStats = replicationStats.getPrimaryStats();
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(primaryStats.getMaxBytes() > 0);
assertTrue(primaryStats.getTotalBytes() > 0);
assertTrue(primaryStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(primaryStats.getMaxBytes() * 2, primaryStats.getTotalBytes());
assertEquals(0, replicationStats.getMaxBytesBehind());
assertEquals(0, replicationStats.getTotalBytesBehind());
assertEquals(0, replicationStats.getMaxReplicationLag());
}
// replica nodes - should hold empty primary replication statistics
// replica nodes - should hold empty replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertEquals(0, primaryStats.getMaxBytes());
assertEquals(0, primaryStats.getTotalBytes());
assertEquals(0, primaryStats.getMaxReplicationLag());
}

final ReplicationStats.ShardReplicationStats replicaStats = replicationStats.getReplicaStats();
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(replicaStats.getMaxBytes() > 0);
assertTrue(replicaStats.getTotalBytes() > 0);
assertTrue(replicaStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(replicaStats.getTotalBytes(), primaryStats.getTotalBytes());
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
assertTrue(replicationStats.getMaxReplicationLag() > 0);
}
}
// get replication statistics at index level
IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet();

// stats should be of non-zero value when aggregated at index level
ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getSegments().getReplicationStats();
final ReplicationStats.ShardReplicationStats primaryStats = indexReplicationStats.getPrimaryStats();
assertNotNull(indexReplicationStats);
assertTrue(primaryStats.getMaxBytes() > 0);
assertTrue(primaryStats.getTotalBytes() > 0);
assertTrue(primaryStats.getMaxReplicationLag() > 0);
assertEquals(2 * primaryStats.getMaxBytes(), primaryStats.getTotalBytes());
assertTrue(indexReplicationStats.getMaxBytesBehind() > 0);
assertTrue(indexReplicationStats.getTotalBytesBehind() > 0);
assertTrue(indexReplicationStats.getMaxReplicationLag() > 0);
assertEquals(2 * indexReplicationStats.getMaxBytesBehind(), indexReplicationStats.getTotalBytesBehind());
}

}
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -450,7 +449,7 @@ public synchronized IndexShard createShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Function<ReplicationCheckpoint, ReplicationStats.ShardReplicationStats> segmentReplicationShardStatsSupplier
final Function<ShardId, ReplicationStats> segmentReplicationShardStatsSupplier
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down
138 changes: 46 additions & 92 deletions server/src/main/java/org/opensearch/index/ReplicationStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,67 +26,79 @@
*/
public class ReplicationStats implements ToXContentFragment, Writeable {

private final ShardReplicationStats primaryStats;
private final ShardReplicationStats replicaStats;
public long maxBytesBehind;
public long maxReplicationLag;
public long totalBytesBehind;

public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) {
this.maxBytesBehind = maxBytesBehind;
this.totalBytesBehind = totalBytesBehind;
this.maxReplicationLag = maxReplicationLag;
}

public ReplicationStats(StreamInput in) throws IOException {
this.primaryStats = new ShardReplicationStats(in);
this.replicaStats = new ShardReplicationStats(in);
this.maxBytesBehind = in.readVLong();
this.totalBytesBehind = in.readVLong();
this.maxReplicationLag = in.readVLong();
}

public ReplicationStats() {
primaryStats = new ShardReplicationStats();
replicaStats = new ShardReplicationStats();

}

public ReplicationStats(long bytesBehind, long replicationLag) {
this.maxBytesBehind = bytesBehind;
this.totalBytesBehind = bytesBehind;
this.maxReplicationLag = replicationLag;
}

public void add(ReplicationStats other) {
if (other != null) {
primaryStats.add(other.primaryStats);
replicaStats.add(other.replicaStats);
maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind);
totalBytesBehind += other.totalBytesBehind;
maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag);
}
}

public ShardReplicationStats getPrimaryStats() {
return primaryStats;
public long getMaxBytesBehind() {
return this.maxBytesBehind;
}

public ShardReplicationStats getReplicaStats() {
return primaryStats;
public long getTotalBytesBehind() {
return this.totalBytesBehind;
}

public long getMaxReplicationLag() {
return this.maxReplicationLag;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
primaryStats.writeTo(out);
replicaStats.writeTo(out);
out.writeVLong(maxBytesBehind);
out.writeVLong(totalBytesBehind);
out.writeVLong(maxReplicationLag);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.SEGMENT_REPLICATION);
builder.startObject(Fields.PRIMARY_STATS);
builder.field(Fields.MAX_BYTES_AHEAD, new ByteSizeValue(primaryStats.maxBytes).toString());
builder.field(Fields.TOTAL_BYTES_AHEAD, new ByteSizeValue(primaryStats.totalBytes).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(primaryStats.maxReplicationLag));
builder.endObject();
builder.startObject(Fields.REPLICA_STATS);
builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(replicaStats.maxBytes).toString());
builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(replicaStats.totalBytes).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(replicaStats.maxReplicationLag));
builder.endObject();
builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString());
builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag));
builder.endObject();
return builder;
}

public void addReplicaStats(ShardReplicationStats other) {
if (other != null) {
replicaStats.add(other);
}
}

public void addPrimaryStats(ShardReplicationStats other) {
if (other != null) {
primaryStats.add(other);
}
@Override
public String toString() {
return "ReplicationStats{"
+ "maxBytesBehind="
+ maxBytesBehind
+ ", maxReplicationLag="
+ maxReplicationLag
+ ", totalBytesBehind="
+ totalBytesBehind
+ '}';
}

/**
Expand All @@ -96,66 +108,8 @@ public void addPrimaryStats(ShardReplicationStats other) {
*/
static final class Fields {
static final String SEGMENT_REPLICATION = "segment_replication";
static final String REPLICA_STATS = "replica_stats";
static final String PRIMARY_STATS = "primary_stats";
static final String MAX_BYTES_BEHIND = "max_bytes_behind";
static final String TOTAL_BYTES_BEHIND = "total_bytes_behind";
static final String MAX_REPLICATION_LAG = "max_replication_lag";
static final String MAX_BYTES_AHEAD = "max_bytes_ahead";
static final String TOTAL_BYTES_AHEAD = "total_bytes_ahead";
}

/**
* Replication stats for a shard. This class is reused by primary and replicas
*/
public static class ShardReplicationStats implements Writeable {
public long maxBytes;
public long totalBytes;
public long maxReplicationLag;

public ShardReplicationStats() {}

public ShardReplicationStats(long bytesBehind, long replicationLag) {
this(bytesBehind, bytesBehind, replicationLag);
}

public ShardReplicationStats(long maxBytes, long totalBytes, long maxReplicationLag) {
this.maxBytes = maxBytes;
this.totalBytes = totalBytes;
this.maxReplicationLag = maxReplicationLag;
}

public ShardReplicationStats(StreamInput in) throws IOException {
this.maxBytes = in.readVLong();
this.totalBytes = in.readVLong();
this.maxReplicationLag = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBytes);
out.writeVLong(totalBytes);
out.writeVLong(maxReplicationLag);
}

public long getMaxBytes() {
return this.maxBytes;
}

public long getTotalBytes() {
return this.totalBytes;
}

public long getMaxReplicationLag() {
return this.maxReplicationLag;
}

public void add(ShardReplicationStats other) {
if (other != null) {
maxBytes = Math.max(other.maxBytes, maxBytes);
totalBytes += other.totalBytes;
maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag);
}
}
}
}
29 changes: 4 additions & 25 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ Runnable getGlobalCheckpointSyncer() {
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;
private final Function<ReplicationCheckpoint, ReplicationStats.ShardReplicationStats> segmentReplicationShardStatsSupplier;
private final Function<ShardId, ReplicationStats> segmentReplicationShardStatsSupplier;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

Expand Down Expand Up @@ -367,7 +367,7 @@ public IndexShard(
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
final Function<ReplicationCheckpoint, ReplicationStats.ShardReplicationStats> segmentReplicationShardStatsSupplier
final Function<ShardId, ReplicationStats> segmentReplicationShardStatsSupplier
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -1396,8 +1396,8 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats())
);
}
if (indexSettings.isSegRepEnabled()) {
segmentsStats.addReplicationStats(getReplicationStats());
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
segmentsStats.addReplicationStats(segmentReplicationShardStatsSupplier.apply(shardId));
}
return segmentsStats;
}
Expand Down Expand Up @@ -2976,27 +2976,6 @@ public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas()
return replicationTracker.getSegmentReplicationStats();
}

public ReplicationStats getReplicationStats() {
final ReplicationStats replicationStats = new ReplicationStats();
if (indexSettings.isSegRepEnabled()) {
if (routingEntry().primary()) {
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
long maxReplicationLag = stats.stream()
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis)
.max()
.orElse(0L);
replicationStats.addPrimaryStats(
new ReplicationStats.ShardReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag)
);
} else {
replicationStats.addReplicaStats(segmentReplicationShardStatsSupplier.apply(getLatestReplicationCheckpoint()));
}
}
return replicationStats;
}

/**
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -975,7 +974,7 @@ public IndexShard createShard(
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Function<ReplicationCheckpoint, ReplicationStats.ShardReplicationStats> segmentReplicationStatsSupplier
final Function<ShardId, ReplicationStats> segmentReplicationStatsSupplier
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -1045,7 +1044,7 @@ T createShard(
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
Function<ReplicationCheckpoint, ReplicationStats.ShardReplicationStats> segmentReplicationShardStatsSupplier
Function<ShardId, ReplicationStats> segmentReplicationShardStatsSupplier
) throws IOException;

/**
Expand Down
Loading

0 comments on commit e92aafe

Please sign in to comment.