Skip to content

Commit

Permalink
Implement Segment replication Backpressure (#6563)
Browse files Browse the repository at this point in the history
* Add Segment Replication backpressure.

This PR introduces new mechanisms to keep track of the current replicas within a replication group and apply backpressure if they fall too far behind.

Writes will be rejected under the following conditions:

1. More than half (default setting) of the replication group is 'stale'.  Defined by setting MAX_ALLOWED_STALE_SHARDS.
2. A replica is stale if it is behind more than MAX_INDEXING_CHECKPOINTS, default 4 AND its current replication lag is over
MAX_REPLICATION_TIME_SETTING, default 5 minutes.

This PR intentionally implements rejections only for index operations,
allowing other TransportWriteActions to succeed, TransportResyncReplicationAction and RetentionLeaseSyncAction.
Blocking these requests will fail recoveries as new nodes are added.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add changelog

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix test class to match naming conventions.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Change setting keys to remove index scope.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 15, 2023
1 parent bb901f5 commit 21cdb72
Show file tree
Hide file tree
Showing 27 changed files with 1,396 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index;

import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT {

private static final int MAX_CHECKPOINTS_BEHIND = 2;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

public void testWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = asList(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); });
indexingThread.start();
indexingThread.join();
latch.await();
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
indexDoc();
totalDocs.incrementAndGet();
});
});
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
indexDoc();
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

/**
* This test ensures that a replica can be added while the index is under write block.
* Ensuring that only write requests are blocked.
*/
public void testAddReplicaWhileWritesBlocked() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = new ArrayList<>();
replicaNodes.add(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); });
indexingThread.start();
indexingThread.join();
latch.await();
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
indexDoc();
totalDocs.incrementAndGet();
});
});
final String replica_2 = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 2))
);
ensureGreen(INDEX_NAME);
replicaNodes.add(replica_2);
waitForSearchableDocs(totalDocs.get(), replica_2);
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
indexDoc();
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testBelowReplicaLimit() throws Exception {
final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3).build();
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, settings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
List<String> replicaNodes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
replicaNodes.add(internalCluster().startNode());
}
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
// only block a single replica, pressure should not get applied.
try (final Releasable ignored = blockReplication(replicaNodes.subList(0, 1), latch)) {
Thread indexingThread = new Thread(() -> totalDocs.getAndSet(indexUntilCheckpointCount()));
indexingThread.start();
indexingThread.join();
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
}
// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testBulkWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = asList(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
List<String> nodes = List.of(primaryNode, replicaNode, coordinator);

int docsPerBatch = randomIntBetween(1, 200);
int totalDocs = docsPerBatch * MAX_CHECKPOINTS_BEHIND;
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> {
for (int i = 0; i < MAX_CHECKPOINTS_BEHIND + 1; i++) {
executeBulkRequest(nodes, docsPerBatch);
refresh(INDEX_NAME);
}
});
indexingThread.start();
indexingThread.join();
latch.await();
// try and index again while we are stale.
assertBusy(() -> { assertFailedRequests(executeBulkRequest(nodes, randomIntBetween(1, 200))); });
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

private BulkResponse executeBulkRequest(List<String> nodes, int docsPerBatch) {
final BulkRequest bulkRequest = new BulkRequest();
for (int j = 0; j < docsPerBatch; ++j) {
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
bulkRequest.add(request);
}
final BulkResponse bulkItemResponses = client(randomFrom(nodes)).bulk(bulkRequest).actionGet();
refresh(INDEX_NAME);
return bulkItemResponses;
}

/**
* Index and Refresh in batches to force checkpoints behind.
* Asserts that there are no stale replicas according to the primary until cp count is reached.
*/
private int indexUntilCheckpointCount() {
int total = 0;
for (int i = 0; i < MAX_CHECKPOINTS_BEHIND; i++) {
final int numDocs = randomIntBetween(1, 100);
for (int j = 0; j < numDocs; ++j) {
indexDoc();
}
total += numDocs;
refresh(INDEX_NAME);
}
return total;
}

private void assertFailedRequests(BulkResponse response) {
assertTrue(Arrays.stream(response.getItems()).allMatch(BulkItemResponse::isFailed));
assertTrue(
Arrays.stream(response.getItems())
.map(BulkItemResponse::getFailure)
.allMatch((failure) -> failure.getStatus() == RestStatus.TOO_MANY_REQUESTS)
);
}

private void indexDoc() {
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get();
}

private void assertEqualSegmentInfosVersion(List<String> replicaNames, IndexShard primaryShard) {
for (String replicaName : replicaNames) {
final IndexShard replicaShard = getIndexShard(replicaName, INDEX_NAME);
assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand All @@ -30,12 +31,14 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -197,4 +200,31 @@ protected IndexShard getIndexShard(String node, String indexName) {
return indexService.getShard(shardId.get());
}

protected Releasable blockReplication(List<String> nodes, CountDownLatch latch) {
CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size());
for (String node : nodes) {

MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
try {
latch.countDown();
pauseReplicationLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
});
}
return () -> {
while (pauseReplicationLatch.getCount() > 0) {
pauseReplicationLatch.countDown();
}
};
}

}
Loading

0 comments on commit 21cdb72

Please sign in to comment.