Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update RefreshPolicy.WAIT_UNTIL for replica shards with segment replication enabled to wait for replica refresh #6464

Merged
merged 25 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
055f225
Initial draft PR for wait_until with segrep
Rishikesh1159 Feb 23, 2023
36f3851
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Feb 24, 2023
c1a4c87
Refactor code and fix test failures.
Rishikesh1159 Feb 28, 2023
c03fa8f
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Feb 28, 2023
666dd0b
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Feb 28, 2023
3798512
add comments and fix tests.
Rishikesh1159 Feb 28, 2023
52ad227
Refactor code, address comments and fix test failures.
Rishikesh1159 Mar 2, 2023
423a0c8
Aplly spotless check
Rishikesh1159 Mar 2, 2023
413b3d4
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 2, 2023
ed8b4a0
Adress comments and add integ test.
Rishikesh1159 Mar 3, 2023
6f2b174
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Mar 3, 2023
1260905
Address comments and fix failing tests.
Rishikesh1159 Mar 3, 2023
adc9bb0
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 3, 2023
17fd7a1
Fixing failing test.
Rishikesh1159 Mar 6, 2023
672a67f
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 6, 2023
f3ee8d6
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Mar 6, 2023
64410d6
Remove unused code.
Rishikesh1159 Mar 6, 2023
b33aca2
Addressing comments and refactoring
Rishikesh1159 Mar 7, 2023
4b47039
Adding max refreshlisteners limit that a replica shard can hold and f…
Rishikesh1159 Mar 9, 2023
6bd0b3d
Changing assert message
Rishikesh1159 Mar 9, 2023
d62aaca
Fix call to release refresh listeners on replica shards.
Rishikesh1159 Mar 9, 2023
0a79735
Fix call to release refresh listeners on replica shards.
Rishikesh1159 Mar 9, 2023
f285ba3
Address comments.
Rishikesh1159 Mar 9, 2023
83a1cac
Fixing compile errors.
Rishikesh1159 Mar 10, 2023
c0721b0
Spoltss Apply
Rishikesh1159 Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -20,15 +23,18 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -515,4 +521,99 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testWaitUntilRefresh() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(6000, 7000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
assertBusy(() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
assertEquals(primaryShard.getProcessedLocalCheckpoint(), replicaShard.getProcessedLocalCheckpoint());
}, 1, TimeUnit.MINUTES);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}

public void testWaitUntilWhenReplicaPromoted() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final CountDownLatch waitForReplication = new CountDownLatch(1);
// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(700, 5000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNode);
assertNotNull(replicaShardRouting);
waitForReplication.countDown();
assertBusy(() -> {
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
int successfulDocCount = 0;
for (ActionFuture<IndexResponse> response : pendingIndexResponses) {
try {
IndexResponse indexResponse = response.actionGet();
successfulDocCount++;
} catch (Exception e) {
logger.trace("Failed to index Doc", e);
}
}
assertTrue(
client(replicaNode).prepareSearch(INDEX_NAME)
.setPreference("_only_local")
.setSize(0)
.get()
.getHits()
.getTotalHits().value >= successfulDocCount
);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a doc count assertion here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if primary goes down before writing all operations to translog? In this case assert on doc count would fail. May be from response of wait_until we can get count of how many docs got written in translog and assert on that count

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, not all of the requests would succeed in that case, so you would need to compute that count as you do here.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse(
@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
final Tuple<Translog.Location, Long> tuple = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, tuple.v1(), tuple.v2(), null, replica, logger);
});
}

Expand All @@ -790,8 +790,10 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final boolean isSegRepEnabled = replica.indexSettings().isSegRepEnabled();
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
Expand All @@ -813,17 +815,23 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
primaryTerm,
response.getFailure().getMessage()
);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getFailure().getSeqNo(), maxSeqNo);
}
} else {
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getResponse().getSeqNo(), maxSeqNo);
}
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
return new Tuple<>(location, maxSeqNo);
}

private static Engine.Result performOpOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnReplica(
) {
ActionListener.completeWith(listener, () -> {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
return new WriteReplicaResult<>(request, location, null, null, replica, logger);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
* We call this after replication because this might wait for a refresh and that can take a while.
* This way we wait for the refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(primary, replicaRequest, location, null, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
Expand All @@ -329,20 +329,23 @@ public void onFailure(Exception ex) {
* @opensearch.internal
*/
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>> extends ReplicaResult {
public final Location location;
private final Location location;
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
private final Long maxSeqNo;

public WriteReplicaResult(
ReplicaRequest request,
@Nullable Location location,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
@Nullable Exception operationFailure,
IndexShard replica,
Logger logger
) {
super(operationFailure);
this.location = location;
this.maxSeqNo = maxSeqNo;
this.request = request;
this.replica = replica;
this.logger = logger;
Expand All @@ -353,7 +356,7 @@ public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
Expand Down Expand Up @@ -403,7 +406,6 @@ interface RespondingWriteResult {
* @opensearch.internal
*/
static final class AsyncAfterWriteAction {
private final Location location;
private final boolean waitUntilRefresh;
private final boolean sync;
private final AtomicInteger pendingOps = new AtomicInteger(1);
Expand All @@ -414,10 +416,15 @@ static final class AsyncAfterWriteAction {
private final WriteRequest<?> request;
private final Logger logger;

private final Location location;

private final Long maxSeqNo;

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
final RespondingWriteResult respond,
final Logger logger
) {
Expand All @@ -443,6 +450,7 @@ static final class AsyncAfterWriteAction {
this.waitUntilRefresh = waitUntilRefresh;
this.respond = respond;
this.location = location;
this.maxSeqNo = maxSeqNo;
if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) {
pendingOps.incrementAndGet();
}
Expand Down Expand Up @@ -474,7 +482,7 @@ void run() {
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
indexShard.addRefreshListener(location, maxSeqNo, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,22 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
maybeRefresh(source);
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
return false;
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
// checks if an actual refresh (change in segments) happened
if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) {
return null;
}
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
return new WriteReplicaResult<>(request, null, null, null, replica, getLogger());
});
}

Expand Down
18 changes: 12 additions & 6 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3996,7 +3996,8 @@ private RefreshListeners buildRefreshListeners() {
() -> refresh("too_many_listeners"),
logger,
threadPool.getThreadContext(),
externalRefreshMetric
externalRefreshMetric,
this::getProcessedLocalCheckpoint
);
}

Expand Down Expand Up @@ -4139,7 +4140,7 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
markSearcherAccessed(); // move the shard into non-search idle
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, (b) -> {
addRefreshListener(location, null, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
});
Expand All @@ -4149,13 +4150,14 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
}

/**
* Add a listener for refreshes.
* Add a listener for refreshes. Only on Segment replication enabled replica shards we listen for maxSeqNo. In all other cases we listen for translog location
*
* @param location the location to listen for
* @param location the translog location to listen for on a refresh
* @param maxSeqNo the Sequence Number to listen for on a refresh
* @param listener for the refresh. Called with true if registering the listener ran it out of slots and forced a refresh. Called with
* false otherwise.
*/
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consumer<Boolean> listener) {
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
Expand All @@ -4168,7 +4170,11 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
}
}
if (readAllowed) {
refreshListeners.addOrNotify(location, listener);
if (indexSettings.isSegRepEnabled() && shardRouting.primary() == false) {
refreshListeners.addOrNotify(maxSeqNo, listener);
} else {
refreshListeners.addOrNotify(location, listener);
}
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
Expand Down
Loading