From 21cdb724ab4dd535b3159e04064e52ab234d21bf Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 14 Mar 2023 17:13:29 -0700 Subject: [PATCH] Implement Segment replication Backpressure (#6563) * Add Segment Replication backpressure. This PR introduces new mechanisms to keep track of the current replicas within a replication group and apply backpressure if they fall too far behind. Writes will be rejected under the following conditions: 1. More than half (default setting) of the replication group is 'stale'. Defined by setting MAX_ALLOWED_STALE_SHARDS. 2. A replica is stale if it is behind more than MAX_INDEXING_CHECKPOINTS, default 4 AND its current replication lag is over MAX_REPLICATION_TIME_SETTING, default 5 minutes. This PR intentionally implements rejections only for index operations, allowing other TransportWriteActions to succeed, TransportResyncReplicationAction and RetentionLeaseSyncAction. Blocking these requests will fail recoveries as new nodes are added. Signed-off-by: Marc Handalian * Add changelog Signed-off-by: Marc Handalian * Fix test class to match naming conventions. Signed-off-by: Marc Handalian * PR feedback. Signed-off-by: Marc Handalian * Change setting keys to remove index scope. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../index/SegmentReplicationPressureIT.java | 271 ++++++++++++++++++ .../replication/SegmentReplicationBaseIT.java | 30 ++ .../replication/SegmentReplicationIT.java | 92 ++++++ .../action/bulk/TransportShardBulkAction.java | 12 + .../common/settings/ClusterSettings.java | 7 +- .../SegmentReplicationPerGroupStats.java | 69 +++++ .../SegmentReplicationPressureService.java | 154 ++++++++++ .../index/SegmentReplicationShardStats.java | 112 ++++++++ .../index/SegmentReplicationStats.java | 73 +++++ .../index/SegmentReplicationStatsTracker.java | 64 +++++ .../index/seqno/ReplicationTracker.java | 136 +++++++++ .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 47 ++- .../OngoingSegmentReplications.java | 4 + .../SegmentReplicationSourceHandler.java | 1 + .../SegmentReplicationTargetService.java | 6 +- .../checkpoint/PublishCheckpointAction.java | 5 +- .../checkpoint/ReplicationCheckpoint.java | 18 ++ ...SegmentReplicationCheckpointPublisher.java | 11 +- .../bulk/TransportShardBulkActionTests.java | 5 + ...egmentReplicationPressureServiceTests.java | 207 +++++++++++++ .../index/seqno/ReplicationTrackerTests.java | 76 +++++ .../SegmentReplicationIndexShardTests.java | 4 +- .../SegmentReplicationSourceHandlerTests.java | 6 +- .../snapshots/SnapshotResiliencyTests.java | 2 + .../index/shard/IndexShardTestCase.java | 4 + 27 files changed, 1396 insertions(+), 23 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationStats.java create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java create mode 100644 server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 24a085db36f3d..4ea3c37f0316d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558)) - Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544)) - Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517)) +- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java new file mode 100644 index 0000000000000..1ecc1fd2c1955 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -0,0 +1,271 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.index; + +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.replication.SegmentReplicationBaseIT; +import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS; +import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING; +import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT { + + private static final int MAX_CHECKPOINTS_BEHIND = 2; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return asList(MockTransportService.TestPlugin.class); + } + + public void testWritesRejected() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + final List replicaNodes = asList(replicaNode); + assertEqualSegmentInfosVersion(replicaNodes, primaryShard); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger totalDocs = new AtomicInteger(0); + try (final Releasable ignored = blockReplication(replicaNodes, latch)) { + Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); }); + indexingThread.start(); + indexingThread.join(); + latch.await(); + // index again while we are stale. + assertBusy(() -> { + expectThrows(OpenSearchRejectedExecutionException.class, () -> { + indexDoc(); + totalDocs.incrementAndGet(); + }); + }); + } + refresh(INDEX_NAME); + // wait for the replicas to catch up after block is released. + waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {})); + + // index another doc showing there is no pressure enforced. + indexDoc(); + waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); + verifyStoreContent(); + } + + /** + * This test ensures that a replica can be added while the index is under write block. + * Ensuring that only write requests are blocked. + */ + public void testAddReplicaWhileWritesBlocked() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + final List replicaNodes = new ArrayList<>(); + replicaNodes.add(replicaNode); + assertEqualSegmentInfosVersion(replicaNodes, primaryShard); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger totalDocs = new AtomicInteger(0); + try (final Releasable ignored = blockReplication(replicaNodes, latch)) { + Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); }); + indexingThread.start(); + indexingThread.join(); + latch.await(); + // index again while we are stale. + assertBusy(() -> { + expectThrows(OpenSearchRejectedExecutionException.class, () -> { + indexDoc(); + totalDocs.incrementAndGet(); + }); + }); + final String replica_2 = internalCluster().startNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 2)) + ); + ensureGreen(INDEX_NAME); + replicaNodes.add(replica_2); + waitForSearchableDocs(totalDocs.get(), replica_2); + } + refresh(INDEX_NAME); + // wait for the replicas to catch up after block is released. + waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {})); + + // index another doc showing there is no pressure enforced. + indexDoc(); + waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); + verifyStoreContent(); + } + + public void testBelowReplicaLimit() throws Exception { + final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3).build(); + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, settings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + List replicaNodes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + replicaNodes.add(internalCluster().startNode()); + } + ensureGreen(INDEX_NAME); + + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + assertEqualSegmentInfosVersion(replicaNodes, primaryShard); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger totalDocs = new AtomicInteger(0); + // only block a single replica, pressure should not get applied. + try (final Releasable ignored = blockReplication(replicaNodes.subList(0, 1), latch)) { + Thread indexingThread = new Thread(() -> totalDocs.getAndSet(indexUntilCheckpointCount())); + indexingThread.start(); + indexingThread.join(); + latch.await(); + indexDoc(); + totalDocs.incrementAndGet(); + refresh(INDEX_NAME); + } + // index another doc showing there is no pressure enforced. + indexDoc(); + refresh(INDEX_NAME); + waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); + verifyStoreContent(); + } + + public void testBulkWritesRejected() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + final String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + ensureGreen(INDEX_NAME); + + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + final List replicaNodes = asList(replicaNode); + assertEqualSegmentInfosVersion(replicaNodes, primaryShard); + + final CountDownLatch latch = new CountDownLatch(1); + List nodes = List.of(primaryNode, replicaNode, coordinator); + + int docsPerBatch = randomIntBetween(1, 200); + int totalDocs = docsPerBatch * MAX_CHECKPOINTS_BEHIND; + try (final Releasable ignored = blockReplication(replicaNodes, latch)) { + Thread indexingThread = new Thread(() -> { + for (int i = 0; i < MAX_CHECKPOINTS_BEHIND + 1; i++) { + executeBulkRequest(nodes, docsPerBatch); + refresh(INDEX_NAME); + } + }); + indexingThread.start(); + indexingThread.join(); + latch.await(); + // try and index again while we are stale. + assertBusy(() -> { assertFailedRequests(executeBulkRequest(nodes, randomIntBetween(1, 200))); }); + } + refresh(INDEX_NAME); + // wait for the replicas to catch up after block is released. + waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {})); + + // index another doc showing there is no pressure enforced. + executeBulkRequest(nodes, totalDocs); + waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {})); + verifyStoreContent(); + } + + private BulkResponse executeBulkRequest(List nodes, int docsPerBatch) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int j = 0; j < docsPerBatch; ++j) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + final BulkResponse bulkItemResponses = client(randomFrom(nodes)).bulk(bulkRequest).actionGet(); + refresh(INDEX_NAME); + return bulkItemResponses; + } + + /** + * Index and Refresh in batches to force checkpoints behind. + * Asserts that there are no stale replicas according to the primary until cp count is reached. + */ + private int indexUntilCheckpointCount() { + int total = 0; + for (int i = 0; i < MAX_CHECKPOINTS_BEHIND; i++) { + final int numDocs = randomIntBetween(1, 100); + for (int j = 0; j < numDocs; ++j) { + indexDoc(); + } + total += numDocs; + refresh(INDEX_NAME); + } + return total; + } + + private void assertFailedRequests(BulkResponse response) { + assertTrue(Arrays.stream(response.getItems()).allMatch(BulkItemResponse::isFailed)); + assertTrue( + Arrays.stream(response.getItems()) + .map(BulkItemResponse::getFailure) + .allMatch((failure) -> failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) + ); + } + + private void indexDoc() { + client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get(); + } + + private void assertEqualSegmentInfosVersion(List replicaNames, IndexShard primaryShard) { + for (String replicaName : replicaNames) { + final IndexShard replicaShard = getIndexShard(replicaName, INDEX_NAME); + assertEquals( + primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index d03ba6a177a30..36569d10c50f6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; @@ -30,12 +31,14 @@ import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -197,4 +200,31 @@ protected IndexShard getIndexShard(String node, String indexName) { return indexService.getShard(shardId.get()); } + protected Releasable blockReplication(List nodes, CountDownLatch latch) { + CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size()); + for (String node : nodes) { + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + node + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) { + try { + latch.countDown(); + pauseReplicationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + return () -> { + while (pauseReplicationLatch.getCount() > 0) { + pauseReplicationLatch.countDown(); + } + }; + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 74d0f32ef1c65..a7506f6205409 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -31,7 +31,11 @@ import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; +import org.opensearch.index.SegmentReplicationPerGroupStats; +import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; @@ -43,6 +47,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -744,4 +749,91 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } + + public void testSegmentReplicationStats() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + + SegmentReplicationPressureService pressureService = internalCluster().getInstance( + SegmentReplicationPressureService.class, + primaryNode + ); + + final Map shardStats = pressureService.nodeStats().getShardStats(); + assertEquals(1, shardStats.size()); + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + IndexShard replica = getIndexShard(replicaNode, INDEX_NAME); + SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId()); + Set replicaStats = groupStats.getReplicaStats(); + assertEquals(1, replicaStats.size()); + assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); + + // assert replica node returns nothing. + SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance( + SegmentReplicationPressureService.class, + replicaNode + ); + assertTrue(replicaNode_service.nodeStats().getShardStats().isEmpty()); + + // drop the primary, this won't hand off SR state. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode); + replica = getIndexShard(replicaNode, INDEX_NAME); + assertTrue("replica should be promoted as a primary", replica.routingEntry().primary()); + assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); + // we don't have a replica assigned yet, so this should be 0. + assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size()); + + // start another replica. + String replicaNode_2 = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); + refresh(INDEX_NAME); + waitForSearchableDocs(initialDocCount + 1, replicaNode_2); + + replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode); + replica = getIndexShard(replicaNode_2, INDEX_NAME); + assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); + replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats(); + assertEquals(1, replicaStats.size()); + assertEquals(replica.routingEntry().currentNodeId(), replicaStats.stream().findFirst().get().getNodeId()); + + // test a checkpoint without any new segments + flush(INDEX_NAME); + assertBusy(() -> { + final SegmentReplicationPressureService service = internalCluster().getInstance( + SegmentReplicationPressureService.class, + replicaNode + ); + assertEquals(1, service.nodeStats().getShardStats().size()); + final Set shardStatsSet = service.nodeStats() + .getShardStats() + .get(primaryShard.shardId()) + .getReplicaStats(); + assertEquals(1, shardStatsSet.size()); + final SegmentReplicationShardStats stats = shardStatsSet.stream().findFirst().get(); + assertEquals(0, stats.getCheckpointsBehindCount()); + }); + } + } } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 0657fab55b220..2552d43688f00 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -81,6 +81,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.get.GetResult; @@ -133,6 +134,7 @@ public class TransportShardBulkAction extends TransportWriteAction replicaStats; + private final long rejectedRequestCount; + + public SegmentReplicationPerGroupStats(Set replicaStats, long rejectedRequestCount) { + this.replicaStats = replicaStats; + this.rejectedRequestCount = rejectedRequestCount; + } + + public SegmentReplicationPerGroupStats(StreamInput in) throws IOException { + this.replicaStats = in.readSet(SegmentReplicationShardStats::new); + this.rejectedRequestCount = in.readVLong(); + } + + public Set getReplicaStats() { + return replicaStats; + } + + public long getRejectedRequestCount() { + return rejectedRequestCount; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("rejected_requests", rejectedRequestCount); + builder.startArray("replicas"); + for (SegmentReplicationShardStats stats : replicaStats) { + stats.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(replicaStats); + out.writeVLong(rejectedRequestCount); + } + + @Override + public String toString() { + return "SegmentReplicationPerGroupStats{" + "replicaStats=" + replicaStats + ", rejectedRequestCount=" + rejectedRequestCount + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java new file mode 100644 index 0000000000000..874f35daf158f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -0,0 +1,154 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Service responsible for applying backpressure for lagging behind replicas when Segment Replication is enabled. + * + * @opensearch.internal + */ +public class SegmentReplicationPressureService { + + private volatile boolean isSegmentReplicationBackpressureEnabled; + private volatile int maxCheckpointsBehind; + private volatile double maxAllowedStaleReplicas; + private volatile TimeValue maxReplicationTime; + + private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class); + + public static final Setting SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED = Setting.boolSetting( + "segrep.pressure.enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MAX_INDEXING_CHECKPOINTS = Setting.intSetting( + "segrep.pressure.checkpoint.limit", + 4, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MAX_REPLICATION_TIME_SETTING = Setting.positiveTimeSetting( + "segrep.pressure.time.limit", + TimeValue.timeValueMinutes(5), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting( + "segrep.pressure.replica.stale.limit", + .5, + 0, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final IndicesService indicesService; + private final SegmentReplicationStatsTracker tracker; + + @Inject + public SegmentReplicationPressureService(Settings settings, ClusterService clusterService, IndicesService indicesService) { + this.indicesService = indicesService; + this.tracker = new SegmentReplicationStatsTracker(this.indicesService); + + final ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.isSegmentReplicationBackpressureEnabled = SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.get(settings); + clusterSettings.addSettingsUpdateConsumer( + SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED, + this::setSegmentReplicationBackpressureEnabled + ); + + this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind); + + this.maxReplicationTime = MAX_REPLICATION_TIME_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_SETTING, this::setMaxReplicationTime); + + this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas); + } + + public void isSegrepLimitBreached(ShardId shardId) { + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard shard = indexService.getShard(shardId.id()); + if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) { + validateReplicationGroup(shard); + } + } + + private void validateReplicationGroup(IndexShard shard) { + final Set replicaStats = shard.getReplicationStats(); + final Set staleReplicas = getStaleReplicas(replicaStats); + if (staleReplicas.isEmpty() == false) { + // inSyncIds always considers the primary id, so filter it out. + final float percentStale = staleReplicas.size() * 100f / (shard.getReplicationGroup().getInSyncAllocationIds().size() - 1); + final double maxStaleLimit = maxAllowedStaleReplicas * 100f; + if (percentStale >= maxStaleLimit) { + tracker.incrementRejectionCount(shard.shardId()); + logger.warn("Rejecting write requests for shard, stale shards [{}%] shards: {}", percentStale, staleReplicas); + throw new OpenSearchRejectedExecutionException( + "rejected execution on primary shard: " + shard.shardId() + " Stale Replicas: " + staleReplicas + "]", + false + ); + } + } + } + + private Set getStaleReplicas(final Set replicas) { + return replicas.stream() + .filter(entry -> entry.getCheckpointsBehindCount() > maxCheckpointsBehind) + .filter(entry -> entry.getCurrentReplicationTimeMillis() > maxReplicationTime.millis()) + .collect(Collectors.toSet()); + } + + public SegmentReplicationStats nodeStats() { + return tracker.getStats(); + } + + public boolean isSegmentReplicationBackpressureEnabled() { + return isSegmentReplicationBackpressureEnabled; + } + + public void setSegmentReplicationBackpressureEnabled(boolean segmentReplicationBackpressureEnabled) { + isSegmentReplicationBackpressureEnabled = segmentReplicationBackpressureEnabled; + } + + public void setMaxCheckpointsBehind(int maxCheckpointsBehind) { + this.maxCheckpointsBehind = maxCheckpointsBehind; + } + + public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) { + this.maxAllowedStaleReplicas = maxAllowedStaleReplicas; + } + + public void setMaxReplicationTime(TimeValue maxReplicationTime) { + this.maxReplicationTime = maxReplicationTime; + } + +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java new file mode 100644 index 0000000000000..3c5845a2e2e94 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * SegRep stats for a single shard. + * + * @opensearch.internal + */ +public class SegmentReplicationShardStats implements Writeable, ToXContentFragment { + private final String nodeId; + private final long checkpointsBehindCount; + private final long bytesBehindCount; + private final long currentReplicationTimeMillis; + private final long lastCompletedReplicationTimeMillis; + + public SegmentReplicationShardStats( + String nodeId, + long checkpointsBehindCount, + long bytesBehindCount, + long currentReplicationTimeMillis, + long lastCompletedReplicationTime + ) { + this.nodeId = nodeId; + this.checkpointsBehindCount = checkpointsBehindCount; + this.bytesBehindCount = bytesBehindCount; + this.currentReplicationTimeMillis = currentReplicationTimeMillis; + this.lastCompletedReplicationTimeMillis = lastCompletedReplicationTime; + } + + public SegmentReplicationShardStats(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.checkpointsBehindCount = in.readVLong(); + this.bytesBehindCount = in.readVLong(); + this.currentReplicationTimeMillis = in.readVLong(); + this.lastCompletedReplicationTimeMillis = in.readVLong(); + } + + public String getNodeId() { + return nodeId; + } + + public long getCheckpointsBehindCount() { + return checkpointsBehindCount; + } + + public long getBytesBehindCount() { + return bytesBehindCount; + } + + public long getCurrentReplicationTimeMillis() { + return currentReplicationTimeMillis; + } + + public long getLastCompletedReplicationTimeMillis() { + return lastCompletedReplicationTimeMillis; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("node_id", nodeId); + builder.field("checkpoints_behind", checkpointsBehindCount); + builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString()); + builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis)); + builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis)); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(nodeId); + out.writeVLong(checkpointsBehindCount); + out.writeVLong(bytesBehindCount); + out.writeVLong(currentReplicationTimeMillis); + out.writeVLong(lastCompletedReplicationTimeMillis); + } + + @Override + public String toString() { + return "SegmentReplicationShardStats{" + + "nodeId='" + + nodeId + + '\'' + + ", checkpointsBehindCount=" + + checkpointsBehindCount + + ", bytesBehindCount=" + + bytesBehindCount + + ", currentReplicationLag=" + + currentReplicationTimeMillis + + ", lastCompletedLag=" + + lastCompletedReplicationTimeMillis + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java new file mode 100644 index 0000000000000..0706ba4353098 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Segment Replication Stats. + * + * @opensearch.internal + */ +public class SegmentReplicationStats implements Writeable, ToXContentFragment { + + private final Map shardStats; + + public SegmentReplicationStats(Map shardStats) { + this.shardStats = shardStats; + } + + public SegmentReplicationStats(StreamInput in) throws IOException { + int shardEntries = in.readInt(); + shardStats = new HashMap<>(); + for (int i = 0; i < shardEntries; i++) { + ShardId shardId = new ShardId(in); + SegmentReplicationPerGroupStats groupStats = new SegmentReplicationPerGroupStats(in); + shardStats.put(shardId, groupStats); + } + } + + public Map getShardStats() { + return shardStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("segment_replication"); + for (Map.Entry entry : shardStats.entrySet()) { + builder.startObject(entry.getKey().toString()); + entry.getValue().toXContent(builder, params); + builder.endObject(); + } + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(shardStats.size()); + for (Map.Entry entry : shardStats.entrySet()) { + entry.getKey().writeTo(out); + entry.getValue().writeTo(out); + } + } + + @Override + public String toString() { + return "SegmentReplicationStats{" + "shardStats=" + shardStats + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java new file mode 100644 index 0000000000000..201bdc97e2466 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracker responsible for computing SegmentReplicationStats. + * + * @opensearch.internal + */ +public class SegmentReplicationStatsTracker { + + private final IndicesService indicesService; + private Map rejectionCount; + + public SegmentReplicationStatsTracker(IndicesService indicesService) { + this.indicesService = indicesService; + rejectionCount = ConcurrentCollections.newConcurrentMap(); + } + + public SegmentReplicationStats getStats() { + Map stats = new HashMap<>(); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { + stats.putIfAbsent( + indexShard.shardId(), + new SegmentReplicationPerGroupStats( + indexShard.getReplicationStats(), + Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) + ) + ); + } + } + } + return new SegmentReplicationStats(stats); + } + + public void incrementRejectionCount(ShardId shardId) { + rejectionCount.compute(shardId, (k, v) -> { + if (v == null) { + return new AtomicInteger(1); + } else { + v.incrementAndGet(); + return v; + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 13ff1b91727eb..4715d1d4ddd5b 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -34,6 +34,7 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -48,14 +49,18 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; import java.nio.file.Path; @@ -66,8 +71,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -243,6 +250,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Consumer onReplicationGroupUpdated; + private volatile ReplicationCheckpoint lastPublishedReplicationCheckpoint; + /** * Get all retention leases tracked on this shard. * @@ -699,12 +708,29 @@ public static class CheckpointState implements Writeable { */ boolean replicated; + /** + * The currently searchable replication checkpoint. + */ + ReplicationCheckpoint visibleReplicationCheckpoint; + + /** + * Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when + * the replica is caught up. + */ + Map checkpointTimers; + + /** + * The time it took to complete the most recent replication event. + */ + long lastCompletedReplicationLag; + public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean replicated) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; this.tracked = tracked; this.replicated = replicated; + this.checkpointTimers = ConcurrentCollections.newConcurrentMap(); } public CheckpointState(StreamInput in) throws IOException { @@ -1137,6 +1163,116 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI assert invariant(); } + /** + * Update the local knowledge of the visible checkpoint for the specified allocation ID. + * + * This method will also stop timers for each shard and compute replication lag metrics. + * + * @param allocationId the allocation ID to update the global checkpoint for + * @param visibleCheckpoint the visible checkpoint + */ + public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { + assert indexSettings.isSegRepEnabled(); + assert primaryMode; + assert handoffInProgress == false; + assert invariant(); + final CheckpointState cps = checkpoints.get(allocationId); + assert !this.shardAllocationId.equals(allocationId) && cps != null; + if (cps.checkpointTimers.isEmpty() == false) { + // stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers. + // Compute the max lag from the set of completed timers. + final AtomicLong lastFinished = new AtomicLong(0L); + cps.checkpointTimers.entrySet().removeIf((entry) -> { + boolean result = visibleCheckpoint.equals(entry.getKey()) || visibleCheckpoint.isAheadOf(entry.getKey()); + if (result) { + final ReplicationTimer timer = entry.getValue(); + timer.stop(); + lastFinished.set(Math.max(lastFinished.get(), timer.time())); + } + return result; + }); + cps.lastCompletedReplicationLag = lastFinished.get(); + } + logger.trace( + () -> new ParameterizedMessage( + "updated local knowledge for [{}] on the primary of the visible checkpoint from [{}] to [{}], active timers {}", + allocationId, + cps.visibleReplicationCheckpoint, + visibleCheckpoint, + cps.checkpointTimers.keySet() + ) + ); + cps.visibleReplicationCheckpoint = visibleCheckpoint; + assert invariant(); + } + + /** + * After a new checkpoint is published, start a timer for each replica to the checkpoint. + * @param checkpoint {@link ReplicationCheckpoint} + */ + public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) { + assert indexSettings.isSegRepEnabled(); + assert primaryMode; + assert handoffInProgress == false; + if (checkpoint.equals(lastPublishedReplicationCheckpoint) == false) { + this.lastPublishedReplicationCheckpoint = checkpoint; + for (Map.Entry entry : checkpoints.entrySet()) { + if (entry.getKey().equals(this.shardAllocationId) == false) { + final CheckpointState cps = entry.getValue(); + if (cps.inSync) { + cps.checkpointTimers.computeIfAbsent(checkpoint, ignored -> { + final ReplicationTimer replicationTimer = new ReplicationTimer(); + replicationTimer.start(); + return replicationTimer; + }); + logger.trace( + () -> new ParameterizedMessage( + "updated last published checkpoint to {} - timers [{}]", + checkpoint, + cps.checkpointTimers.keySet() + ) + ); + } + } + } + } + } + + /** + * Fetch stats on segment replication. + * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, + * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. + */ + public synchronized Set getSegmentReplicationStats() { + assert indexSettings.isSegRepEnabled(); + final ReplicationCheckpoint lastPublishedCheckpoint = this.lastPublishedReplicationCheckpoint; + if (primaryMode && lastPublishedCheckpoint != null) { + return this.checkpoints.entrySet() + .stream() + .filter(entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync) + .map(entry -> buildShardStats(lastPublishedCheckpoint.getLength(), entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + } + return Collections.emptySet(); + } + + private SegmentReplicationShardStats buildShardStats( + final long latestCheckpointLength, + final String allocationId, + final CheckpointState checkpointState + ) { + final Map checkpointTimers = checkpointState.checkpointTimers; + return new SegmentReplicationShardStats( + Optional.ofNullable(this.routingTable.getByAllocationId(allocationId)).map(ShardRouting::currentNodeId).orElse("not assigned"), + checkpointTimers.size(), + checkpointState.visibleReplicationCheckpoint == null + ? latestCheckpointLength + : Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0), + checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0), + checkpointState.lastCompletedReplicationLag + ); + } + /** * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index fb046e2310d93..dd9d967d74ad5 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -41,7 +41,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) { - publisher.publish(shard); + publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 23b65d03d8e23..b28d02158a17a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -107,6 +107,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.bitset.ShardBitsetFilterCache; @@ -1478,19 +1479,26 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()) - .map( - segmentInfos -> new Tuple<>( + return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { + try { + return new Tuple<>( snapshot, new ReplicationCheckpoint( this.shardId, getOperationPrimaryTerm(), segmentInfos.getGeneration(), - segmentInfos.getVersion() + segmentInfos.getVersion(), + // TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with + // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. + shardRouting.primary() + ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() + : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes() ) - ) - ) - .orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + ); + } catch (IOException e) { + throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); + } + }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** @@ -1732,6 +1740,10 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } + public void onCheckpointPublished(ReplicationCheckpoint checkpoint) { + replicationTracker.setLatestReplicationCheckpoint(checkpoint); + } + /** * Wrapper for a non-closing reader * @@ -2696,6 +2708,27 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } + /** + * Update the local knowledge of the visible global checkpoint for the specified allocation ID. + * + * @param allocationId the allocation ID to update the global checkpoint for + * @param visibleCheckpoint the visible checkpoint + */ + public void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { + assert assertPrimaryMode(); + verifyNotClosed(); + replicationTracker.updateVisibleCheckpointForShard(allocationId, visibleCheckpoint); + } + + /** + * Fetch stats on segment replication. + * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, + * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. + */ + public Set getReplicationStats() { + return replicationTracker.getSegmentReplicationStats(); + } + /** * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index ddc0258695d8a..5a23bfff7c040 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -121,6 +121,10 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { try { + shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index cbdbd05956345..0cdd4907d31ea 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -227,9 +227,10 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe public void onReplicationDone(SegmentReplicationState state) { logger.trace( () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete, timing data: {}", + "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", replicaShard.shardId().getId(), state.getReplicationId(), + replicaShard.getLatestReplicationCheckpoint(), state.getTimingData() ) ); @@ -402,9 +403,10 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha public void onReplicationDone(SegmentReplicationState state) { logger.trace( () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete, timing data: {}", + "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", indexShard.shardId().getId(), state.getReplicationId(), + indexShard.getLatestReplicationCheckpoint(), state.getTimingData() ) ); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 0fd934c31ef7f..e3d19461f9e35 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -109,14 +109,13 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { /** * Publish checkpoint request to shard */ - final void publish(IndexShard indexShard) { + final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the sync is authorized threadContext.markAsSystemContext(); - PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); final List replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); for (ShardRouting replicationTarget : replicationTargets) { diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index c03473e0a8c30..57e667b06a223 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -29,6 +29,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable>) invocation -> List.of(indexService).iterator()); + when(indexService.iterator()).thenAnswer((Answer>) invocation -> List.of(primaryShard).iterator()); + when(indicesService.indexService(primaryShard.shardId().getIndex())).thenReturn(indexService); + when(indexService.getShard(primaryShard.shardId().id())).thenReturn(primaryShard); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + + return new SegmentReplicationPressureService(settings, clusterService, indicesService); + } +} diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 8ea64e71fb9dc..ee5307320cbfb 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -46,7 +46,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import java.io.IOException; @@ -72,6 +75,7 @@ import java.util.stream.Stream; import static java.util.Collections.emptySet; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; @@ -1770,6 +1774,78 @@ public void testUpdateAllocationIdsFromClusterManagerWithRemoteTranslogEnabled() assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId.getId())); } + public void testSegmentReplicationCheckpointTracking() { + Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + final long initialClusterStateVersion = randomNonNegativeLong(); + final int numberOfActiveAllocationsIds = randomIntBetween(2, 16); + final int numberOfInitializingIds = randomIntBetween(2, 16); + final Tuple, Set> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds( + numberOfActiveAllocationsIds, + numberOfInitializingIds + ); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingIds = activeAndInitializingAllocationIds.v2(); + AllocationId primaryId = activeAllocationIds.iterator().next(); + IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId); + final ReplicationTracker tracker = newTracker(primaryId, settings); + tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); + assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); + assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable)); + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); + // get insync ids, filter out the primary. + final Set inSyncAllocationIds = tracker.getReplicationGroup() + .getInSyncAllocationIds() + .stream() + .filter(id -> tracker.shardAllocationId.equals(id) == false) + .collect(Collectors.toSet()); + + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L); + final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L); + final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L); + + tracker.setLatestReplicationCheckpoint(initialCheckpoint); + tracker.setLatestReplicationCheckpoint(secondCheckpoint); + tracker.setLatestReplicationCheckpoint(thirdCheckpoint); + + Set groupStats = tracker.getSegmentReplicationStats(); + assertEquals(inSyncAllocationIds.size(), groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(3, shardStat.getCheckpointsBehindCount()); + assertEquals(100L, shardStat.getBytesBehindCount()); + } + + // simulate replicas moved up to date. + final Map checkpoints = tracker.checkpoints; + for (String id : inSyncAllocationIds) { + final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); + assertEquals(3, checkpointState.checkpointTimers.size()); + tracker.updateVisibleCheckpointForShard(id, initialCheckpoint); + assertEquals(2, checkpointState.checkpointTimers.size()); + } + + groupStats = tracker.getSegmentReplicationStats(); + assertEquals(inSyncAllocationIds.size(), groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(2, shardStat.getCheckpointsBehindCount()); + assertEquals(99L, shardStat.getBytesBehindCount()); + } + + for (String id : inSyncAllocationIds) { + final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); + assertEquals(2, checkpointState.checkpointTimers.size()); + tracker.updateVisibleCheckpointForShard(id, thirdCheckpoint); + assertEquals(0, checkpointState.checkpointTimers.size()); + } + + groupStats = tracker.getSegmentReplicationStats(); + assertEquals(inSyncAllocationIds.size(), groupStats.size()); + for (SegmentReplicationShardStats shardStat : groupStats) { + assertEquals(0, shardStat.getCheckpointsBehindCount()); + assertEquals(0L, shardStat.getBytesBehindCount()); + } + } + public void testPrimaryContextHandoffWithRemoteTranslogEnabled() throws IOException { Settings settings = Settings.builder().put("index.remote_store.translog.enabled", "true").build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", settings); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 79b03c4b7afc5..f359393de2b90 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -258,7 +258,7 @@ public void testPublishCheckpointOnPrimaryMode() throws IOException { refreshListener.afterRefresh(true); // verify checkpoint is published - verify(mock, times(1)).publish(any()); + verify(mock, times(1)).publish(any(), any()); closeShards(shard); } @@ -280,7 +280,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { refreshListener.afterRefresh(true); // verify checkpoint is not published - verify(mock, times(0)).publish(any()); + verify(mock, times(0)).publish(any(), any()); closeShards(shard); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index b5d8b2baf40dc..607f9dd91e35e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -73,7 +73,7 @@ public void testSendFiles() throws IOException { chunkWriter, threadPool, copyState, - primary.routingEntry().allocationId().getId(), + replica.routingEntry().allocationId().getId(), 5000, 1 ); @@ -111,7 +111,7 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter, threadPool, copyState, - primary.routingEntry().allocationId().getId(), + replica.routingEntry().allocationId().getId(), 5000, 1 ); @@ -191,7 +191,7 @@ public void testReplicationAlreadyRunning() throws IOException { chunkWriter, threadPool, copyState, - primary.routingEntry().allocationId().getId(), + replica.routingEntry().allocationId().getId(), 5000, 1 ); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a6677fbc0c99b..b9087db60c271 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -169,6 +169,7 @@ import org.opensearch.gateway.TransportNodesListGatewayStartedShards; import org.opensearch.index.Index; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -1978,6 +1979,7 @@ public void onFailure(final Exception e) { new UpdateHelper(scriptService), actionFilters, new IndexingPressureService(settings, clusterService), + new SegmentReplicationPressureService(settings, clusterService, mock(IndicesService.class)), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index d39e190a6f124..ab0cf38f77c7d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1365,6 +1365,10 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + primaryShard.updateVisibleCheckpointForShard( + replica.routingEntry().allocationId().getId(), + primaryShard.getLatestReplicationCheckpoint() + ); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } finally {