Skip to content

Commit

Permalink
Revert "[Segment Replication] Update RefreshPolicy.WAIT_UNTIL for rep…
Browse files Browse the repository at this point in the history
…lica shards with segment replication enabled to wait for replica refresh (opensearch-project#6464)"

This reverts commit e8a4210.
  • Loading branch information
dreamer-89 committed Mar 11, 2023
1 parent b0ab550 commit 4711ffe
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -24,18 +21,15 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -605,99 +599,4 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testWaitUntilRefresh() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(6000, 7000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
assertBusy(() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
assertEquals(primaryShard.getProcessedLocalCheckpoint(), replicaShard.getProcessedLocalCheckpoint());
}, 1, TimeUnit.MINUTES);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}

public void testWaitUntilWhenReplicaPromoted() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final CountDownLatch waitForReplication = new CountDownLatch(1);
// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
try {
waitForReplication.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(700, 5000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNode);
assertNotNull(replicaShardRouting);
waitForReplication.countDown();
assertBusy(() -> {
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
int successfulDocCount = 0;
for (ActionFuture<IndexResponse> response : pendingIndexResponses) {
try {
IndexResponse indexResponse = response.actionGet();
successfulDocCount++;
} catch (Exception e) {
logger.trace("Failed to index Doc", e);
}
}
assertTrue(
client(replicaNode).prepareSearch(INDEX_NAME)
.setPreference("_only_local")
.setSize(0)
.get()
.getHits()
.getTotalHits().value >= successfulDocCount
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse(
@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Tuple<Translog.Location, Long> tuple = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, tuple.v1(), tuple.v2(), null, replica, logger);
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}

Expand All @@ -790,10 +790,8 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final boolean isSegRepEnabled = replica.indexSettings().isSegRepEnabled();
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
Expand All @@ -815,23 +813,17 @@ public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest r
primaryTerm,
response.getFailure().getMessage()
);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getFailure().getSeqNo(), maxSeqNo);
}
} else {
if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) {
continue; // ignore replication as it's a noop
}
assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
operationResult = performOpOnReplica(response.getResponse(), item.request(), replica);
if (isSegRepEnabled) {
maxSeqNo = Math.max(response.getResponse().getSeqNo(), maxSeqNo);
}
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return new Tuple<>(location, maxSeqNo);
return location;
}

private static Engine.Result performOpOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnReplica(
) {
ActionListener.completeWith(listener, () -> {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, null, replica, logger);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void runPostReplicationActions(ActionListener<Void> listener) {
* We call this after replication because this might wait for a refresh and that can take a while.
* This way we wait for the refresh in parallel on the primary and on the replica.
*/
new AsyncAfterWriteAction(primary, replicaRequest, location, null, new RespondingWriteResult() {
new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
Expand All @@ -329,23 +329,20 @@ public void onFailure(Exception ex) {
* @opensearch.internal
*/
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>> extends ReplicaResult {
private final Location location;
public final Location location;
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
private final Long maxSeqNo;

public WriteReplicaResult(
ReplicaRequest request,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
@Nullable Location location,
@Nullable Exception operationFailure,
IndexShard replica,
Logger logger
) {
super(operationFailure);
this.location = location;
this.maxSeqNo = maxSeqNo;
this.request = request;
this.replica = replica;
this.logger = logger;
Expand All @@ -356,7 +353,7 @@ public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
Expand Down Expand Up @@ -406,6 +403,7 @@ interface RespondingWriteResult {
* @opensearch.internal
*/
static final class AsyncAfterWriteAction {
private final Location location;
private final boolean waitUntilRefresh;
private final boolean sync;
private final AtomicInteger pendingOps = new AtomicInteger(1);
Expand All @@ -416,15 +414,10 @@ static final class AsyncAfterWriteAction {
private final WriteRequest<?> request;
private final Logger logger;

private final Location location;

private final Long maxSeqNo;

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
@Nullable final Translog.Location location,
@Nullable final Long maxSeqNo,
final RespondingWriteResult respond,
final Logger logger
) {
Expand All @@ -450,7 +443,6 @@ static final class AsyncAfterWriteAction {
this.waitUntilRefresh = waitUntilRefresh;
this.respond = respond;
this.location = location;
this.maxSeqNo = maxSeqNo;
if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) {
pendingOps.incrementAndGet();
}
Expand Down Expand Up @@ -482,7 +474,7 @@ void run() {
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, maxSeqNo, forcedRefresh -> {
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,11 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {
maybeRefresh(source);
}
public void refresh(String source) throws EngineException {}

@Override
public boolean maybeRefresh(String source) throws EngineException {
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
// checks if an actual refresh (change in segments) happened
if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) {
return null;
}
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, null, replica, getLogger());
return new WriteReplicaResult<>(request, null, null, replica, getLogger());
});
}

Expand Down
18 changes: 6 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3994,8 +3994,7 @@ private RefreshListeners buildRefreshListeners() {
() -> refresh("too_many_listeners"),
logger,
threadPool.getThreadContext(),
externalRefreshMetric,
this::getProcessedLocalCheckpoint
externalRefreshMetric
);
}

Expand Down Expand Up @@ -4138,7 +4137,7 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
markSearcherAccessed(); // move the shard into non-search idle
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, null, (b) -> {
addRefreshListener(location, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
});
Expand All @@ -4148,14 +4147,13 @@ public final void awaitShardSearchActive(Consumer<Boolean> listener) {
}

/**
* Add a listener for refreshes. Only on Segment replication enabled replica shards we listen for maxSeqNo. In all other cases we listen for translog location
* Add a listener for refreshes.
*
* @param location the translog location to listen for on a refresh
* @param maxSeqNo the Sequence Number to listen for on a refresh
* @param location the location to listen for
* @param listener for the refresh. Called with true if registering the listener ran it out of slots and forced a refresh. Called with
* false otherwise.
*/
public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consumer<Boolean> listener) {
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
Expand All @@ -4168,11 +4166,7 @@ public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consum
}
}
if (readAllowed) {
if (indexSettings.isSegRepEnabled() && shardRouting.primary() == false) {
refreshListeners.addOrNotify(maxSeqNo, listener);
} else {
refreshListeners.addOrNotify(location, listener);
}
refreshListeners.addOrNotify(location, listener);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
Expand Down
Loading

0 comments on commit 4711ffe

Please sign in to comment.