Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update RefreshPolicy.WAIT_UNTIL for replica shards with segment replication enabled to wait for replica refresh #6464

Merged
merged 25 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
055f225
Initial draft PR for wait_until with segrep
Rishikesh1159 Feb 23, 2023
36f3851
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Feb 24, 2023
c1a4c87
Refactor code and fix test failures.
Rishikesh1159 Feb 28, 2023
c03fa8f
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Feb 28, 2023
666dd0b
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Feb 28, 2023
3798512
add comments and fix tests.
Rishikesh1159 Feb 28, 2023
52ad227
Refactor code, address comments and fix test failures.
Rishikesh1159 Mar 2, 2023
423a0c8
Aplly spotless check
Rishikesh1159 Mar 2, 2023
413b3d4
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 2, 2023
ed8b4a0
Adress comments and add integ test.
Rishikesh1159 Mar 3, 2023
6f2b174
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Mar 3, 2023
1260905
Address comments and fix failing tests.
Rishikesh1159 Mar 3, 2023
adc9bb0
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 3, 2023
17fd7a1
Fixing failing test.
Rishikesh1159 Mar 6, 2023
672a67f
Merge branch 'opensearch-project:main' into wait_until
Rishikesh1159 Mar 6, 2023
f3ee8d6
Merge branch 'wait_until' of https://github.com/Rishikesh1159/OpenSea…
Rishikesh1159 Mar 6, 2023
64410d6
Remove unused code.
Rishikesh1159 Mar 6, 2023
b33aca2
Addressing comments and refactoring
Rishikesh1159 Mar 7, 2023
4b47039
Adding max refreshlisteners limit that a replica shard can hold and f…
Rishikesh1159 Mar 9, 2023
6bd0b3d
Changing assert message
Rishikesh1159 Mar 9, 2023
d62aaca
Fix call to release refresh listeners on replica shards.
Rishikesh1159 Mar 9, 2023
0a79735
Fix call to release refresh listeners on replica shards.
Rishikesh1159 Mar 9, 2023
f285ba3
Address comments.
Rishikesh1159 Mar 9, 2023
83a1cac
Fixing compile errors.
Rishikesh1159 Mar 10, 2023
c0721b0
Spoltss Apply
Rishikesh1159 Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -20,15 +22,18 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -515,4 +520,46 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testWaitUntil() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these base settings are already defined in SegmentReplicationBaseIT, also why do we need to change refresh_interval here?

.put("index.number_of_replicas", 1)
// we want to control refreshes
.put("index.refresh_interval", "40ms")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode(featureFlagSettings());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to pass featureflag settings to startNode() anymore, they will automatically get picked up.

ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(4000, 5000);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);

for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
assertBusy(
() -> {
assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
},
1,
TimeUnit.MINUTES
);

assertEquals(primaryShard.getLatestReplicationCheckpoint().getSeqNo(), replicaShard.getLatestReplicationCheckpoint().getSeqNo());
Copy link
Member

@dreamer-89 dreamer-89 Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to assert this with every wait_until request to ensure when request is Acked, the replica does catch up with latest seq no on primary ?

Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. Do you think some assertion like this would be good:

assertBusy(
            () -> {
                for(ActionFuture<IndexResponse> response:pendingIndexResponses){
                    if(response.actionGet().status().equals(RestStatus.CREATED)){
                        assertEquals(primaryShard.getLatestReplicationCheckpoint().getSeqNo(), replicaShard.getLatestReplicationCheckpoint().getSeqNo());
                    }
                }
                assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED)));
            },
            1,
            TimeUnit.MINUTES
        );

Copy link
Member

@dreamer-89 dreamer-89 Mar 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be tricky with async operations here. There is one option of applying this assertion (equal seq no on primary & replica) if we perform single indexing operation at a time, not sure if this is what we want from this test. Please feel free to ignore my previous comment here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will be tricky if we want to assert after every request. Th purpose of this test is to make sure that we wait until all requests are finished and replica has all indexed docs ready to be searched


assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse(
@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
final Tuple<Translog.Location, Long> tuple = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, tuple, replica, logger);
});
}

Expand All @@ -790,8 +790,9 @@ protected long replicaOperationSize(BulkShardRequest request) {
return request.ramBytesUsed();
}

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public static Tuple<Translog.Location, Long> performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (int i = 0; i < request.items().length; i++) {
final BulkItemRequest item = request.items()[i];
final BulkItemResponse response = item.getPrimaryResponse();
Expand Down Expand Up @@ -822,8 +823,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
maxSeqNo = response.getResponse().getSeqNo();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to keep track of ongoing max here as we iterate all the items, this will overwrite the value with each iteration. The last item in the list is not guaranteed to have the highest seqNo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I will keep track of maxSeqNo here

}
return location;
return new Tuple<Translog.Location, Long>(location, maxSeqNo);
}

private static Engine.Result performOpOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -333,6 +335,7 @@ public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteReq
private final ReplicaRequest request;
private final IndexShard replica;
private final Logger logger;
private long maxSeqNo;

public WriteReplicaResult(
ReplicaRequest request,
Expand All @@ -343,17 +346,23 @@ public WriteReplicaResult(
) {
super(operationFailure);
this.location = location;
this.maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
this.request = request;
this.replica = replica;
this.logger = logger;
}

public WriteReplicaResult(ReplicaRequest request, Tuple<Location, Long> tuple, IndexShard replica, Logger logger) {
this(request, tuple.v1(), null, replica, logger);
this.maxSeqNo = tuple.v2();
}

@Override
public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() {
@Override
public void onSuccess(boolean forcedRefresh) {
listener.onResponse(null);
Expand Down Expand Up @@ -414,6 +423,8 @@ static final class AsyncAfterWriteAction {
private final WriteRequest<?> request;
private final Logger logger;

private long maxSeqNo;

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
Expand Down Expand Up @@ -450,6 +461,18 @@ static final class AsyncAfterWriteAction {
assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOpts was: " + pendingOps.get();
}

AsyncAfterWriteAction(
final IndexShard indexShard,
final WriteRequest<?> request,
@Nullable final Translog.Location location,
final long maxSeqNo,
final RespondingWriteResult respond,
final Logger logger
) {
this(indexShard, request, location, respond, logger);
this.maxSeqNo = maxSeqNo;
}

/** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/
private void maybeFinish() {
final int numPending = pendingOps.decrementAndGet();
Expand All @@ -473,14 +496,25 @@ void run() {
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
if (indexShard.indexSettings().isSegRepEnabled() == true) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(maxSeqNo, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
} else {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request);
}
refreshed.set(forcedRefresh);
maybeFinish();
});
}
}
if (sync) {
assert pendingOps.get() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
Expand Down Expand Up @@ -306,7 +307,35 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
try {
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
if (store.tryIncRef()) {
// increment the ref just to ensure nobody closes the store during a refresh
try {
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
ReferenceManager<OpenSearchDirectoryReader> referenceManager = getReferenceManager(SearcherScope.EXTERNAL);
// it is intentional that we never refresh both internal / external together
referenceManager.maybeRefresh();

} finally {
store.decRef();
}
} else {
return;
}
} catch (AlreadyClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4175,6 +4175,26 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
}
}

public void addRefreshListener(long maxSeqNo, Consumer<Boolean> listener) {
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
} else {
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (postRecoveryMutex) {
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
refreshListeners.addOrNotify(maxSeqNo, listener, this);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
}
}

/**
* Metrics updater for a refresh
*
Expand Down
Loading