Skip to content

Commit

Permalink
Add backpressure for index operations when Segment Replication is ena…
Browse files Browse the repository at this point in the history
…bled.

This PR implements backpressure mechanisms for segment replication to prevent lagging
replicas from falling too far behind. Writes will be rejected under the following conditions:

1. More than half (default setting) of the replication group is 'stale'.  Defined by setting MAX_ALLOWED_STALE_SHARDS.
2. A replica is stale if it is behind more than MAX_INDEXING_CHECKPOINTS, default 4 AND its current replication lag is over
MAX_REPLICATION_TIME_SETTING, default 5 minutes.

This PR intentionally implements rejections only for index operations,
allowing other TransportWriteActions to succeed, TransportResyncReplicationAction and RetentionLeaseSyncAction.
Blocking these requests will fail recoveries as new nodes are added.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 7, 2023
1 parent 2c69235 commit 976effd
Show file tree
Hide file tree
Showing 12 changed files with 690 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index;

import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT {

private static final int MAX_CHECKPOINTS_BEHIND = 2;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

public void testWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = asList(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); });
indexingThread.start();
indexingThread.join();
latch.await();
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
indexDoc();
totalDocs.incrementAndGet();
});
});
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
indexDoc();
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

/**
* This test ensures that a replica can be added while the index is under write block.
* Ensuring that only write requests are blocked.
*/
public void testAddReplicaWhileWritesBlocked() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = new ArrayList<>();
replicaNodes.add(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> { totalDocs.getAndSet(indexUntilCheckpointCount()); });
indexingThread.start();
indexingThread.join();
latch.await();
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
indexDoc();
totalDocs.incrementAndGet();
});
});
final String replica_2 = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 2))
);
ensureGreen(INDEX_NAME);
replicaNodes.add(replica_2);
waitForSearchableDocs(totalDocs.get(), replica_2);
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
indexDoc();
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testBelowReplicaLimit() throws Exception {
final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3).build();
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, settings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
List<String> replicaNodes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
replicaNodes.add(internalCluster().startNode());
}
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalDocs = new AtomicInteger(0);
// only block a single replica, pressure should not get applied.
try (final Releasable ignored = blockReplication(replicaNodes.subList(0, 1), latch)) {
Thread indexingThread = new Thread(() -> totalDocs.getAndSet(indexUntilCheckpointCount()));
indexingThread.start();
indexingThread.join();
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
}
// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testBulkWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
ensureGreen(INDEX_NAME);

final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
final List<String> replicaNodes = asList(replicaNode);
assertEqualSegmentInfosVersion(replicaNodes, primaryShard);

final CountDownLatch latch = new CountDownLatch(1);
List<String> nodes = List.of(primaryNode, replicaNode, coordinator);

int docsPerBatch = randomIntBetween(1, 200);
int totalDocs = docsPerBatch * MAX_CHECKPOINTS_BEHIND;
try (final Releasable ignored = blockReplication(replicaNodes, latch)) {
Thread indexingThread = new Thread(() -> {
for (int i = 0; i < MAX_CHECKPOINTS_BEHIND + 1; i++) {
executeBulkRequest(nodes, docsPerBatch);
refresh(INDEX_NAME);
}
});
indexingThread.start();
indexingThread.join();
latch.await();
// try and index again while we are stale.
assertBusy(() -> { assertFailedRequests(executeBulkRequest(nodes, randomIntBetween(1, 200))); });
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {}));

// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

private BulkResponse executeBulkRequest(List<String> nodes, int docsPerBatch) {
final BulkRequest bulkRequest = new BulkRequest();
for (int j = 0; j < docsPerBatch; ++j) {
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
bulkRequest.add(request);
}
final BulkResponse bulkItemResponses = client(randomFrom(nodes)).bulk(bulkRequest).actionGet();
refresh(INDEX_NAME);
return bulkItemResponses;
}

/**
* Index and Refresh in batches to force checkpoints behind.
* Asserts that there are no stale replicas according to the primary until cp count is reached.
*/
private int indexUntilCheckpointCount() {
int total = 0;
for (int i = 0; i < MAX_CHECKPOINTS_BEHIND; i++) {
final int numDocs = randomIntBetween(1, 100);
for (int j = 0; j < numDocs; ++j) {
indexDoc();
}
total += numDocs;
refresh(INDEX_NAME);
}
return total;
}

private void assertFailedRequests(BulkResponse response) {
assertTrue(Arrays.stream(response.getItems()).allMatch(BulkItemResponse::isFailed));
assertTrue(
Arrays.stream(response.getItems())
.map(BulkItemResponse::getFailure)
.allMatch((failure) -> failure.getStatus() == RestStatus.TOO_MANY_REQUESTS)
);
}

private void indexDoc() {
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get();
}

private void assertEqualSegmentInfosVersion(List<String> replicaNames, IndexShard primaryShard) {
for (String replicaName : replicaNames) {
final IndexShard replicaShard = getIndexShard(replicaName, INDEX_NAME);
assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
}
}

private Releasable blockReplication(List<String> nodes, CountDownLatch latch) {
CountDownLatch pauseReplicationLatch = new CountDownLatch(nodes.size());
for (String node : nodes) {

MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
try {
latch.countDown();
pauseReplicationLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
connection.sendRequest(requestId, action, request, options);
});
}
return () -> {
while (pauseReplicationLatch.getCount() > 0) {
pauseReplicationLatch.countDown();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private final SegmentReplicationPressureService segmentReplicationPressureService;

/**
* This action is used for performing primary term validation. With remote translog enabled, the translogs would
Expand All @@ -155,6 +157,7 @@ public TransportShardBulkAction(
UpdateHelper updateHelper,
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
SystemIndices systemIndices
) {
super(
Expand All @@ -175,6 +178,7 @@ public TransportShardBulkAction(
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand Down Expand Up @@ -522,6 +526,14 @@ private void finishRequest() {
}.run();
}

@Override
protected Releasable checkPrimaryLimits(BulkShardRequest request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
if (force(request) == false && segmentReplicationPressureService.isSegmentReplicationBackpressureEnabled()) {
segmentReplicationPressureService.isSegrepLimitBreached(request.shardId());
}
return super.checkPrimaryLimits(request, rerouteWasLocal, localRerouteInitiatedByNodeClient);
}

/**
* Executes bulk item requests and handles request execution exceptions.
* @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
Expand Down Expand Up @@ -624,7 +625,11 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, // deprecated
SearchBackpressureSettings.SETTING_CANCELLATION_RATE, // deprecated
SearchBackpressureSettings.SETTING_CANCELLATION_BURST // deprecated
SearchBackpressureSettings.SETTING_CANCELLATION_BURST, // deprecated
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS
)
)
);
Expand Down
Loading

0 comments on commit 976effd

Please sign in to comment.