Skip to content

Commit

Permalink
Segment Replication - PIT/Scroll compatibility.
Browse files Browse the repository at this point in the history
This change makes updates to make PIT/Scroll queries compatibile with Segment Replication.
It does this by refcounting files when a new reader is created, and discarding those files after
a reader is closed.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 13, 2023
1 parent c2388b5 commit 26477a8
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.PitTestsUtil;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -17,26 +30,31 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static java.util.Arrays.asList;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.*;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {
Expand Down Expand Up @@ -600,4 +618,160 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testScrollCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
}
assertBusy(() -> assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()));
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);
SearchResponse searchResponse = client(replica).prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setPreference("_only_local")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.addSort("field", SortOrder.ASC)
.setSize(10)
.setScroll(TimeValue.timeValueDays(1))
.get();

flush(INDEX_NAME);

for (int i = 3; i < 50; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion());
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion());
});
// Test stats
long numHits = 0;
do {
numHits += searchResponse.getHits().getHits().length;
searchResponse = client(replica)
.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueDays(1)).get();
assertAllSuccessful(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

client(replica)
.prepareClearScroll()
.addScrollId(searchResponse.getScrollId())
.get();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("1")
.setSource("foo", randomInt()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", randomInt()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
for (int i = 3; i < 100; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", randomInt()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
}
// wait until replication finishes, then make the pit request.
assertBusy(() -> assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()));
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setPreference("_only_local");
request.setIndices(new String[] { INDEX_NAME });
ActionFuture<CreatePitResponse> execute = client(replica)
.execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.setRequestCache(false)
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", randomInt()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion());
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion());
});
// Test stats
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(INDEX_NAME);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
assertEquals(1, pitCurrent);
assertEquals(1, openContexts);
SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime());
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

// delete the PIT
DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
Expand Down Expand Up @@ -119,6 +119,21 @@ public void onAfterTranslogSync() {
}
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decrefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint("On reader closed", getLatestSegmentInfos());
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
});
}

@Override
public TranslogManager translogManager() {
return translogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
Expand All @@ -35,17 +37,26 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {

private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
private Consumer<Collection<String>> onReaderClosed;
private Consumer<Collection<String>> onNewReader;

/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
* @param reader - The SegmentReplicationReaderManager to use for future reopens.
* @param onNewReader - Called when a new reader is created.
* @param onReaderClosed - Called when a reader is closed.
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
NRTReplicationReaderManager(OpenSearchDirectoryReader reader,
Consumer<Collection<String>> onNewReader,
Consumer<Collection<String>> onReaderClosed) throws IOException {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onReaderClosed = onReaderClosed;
this.onNewReader = onNewReader;
onNewReader.accept(currentInfos.files(true));
}

@Override
Expand All @@ -56,6 +67,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
final Collection<String> files = currentInfos.files(true);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Expand All @@ -64,7 +76,10 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
onNewReader.accept(files);
OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
return reader;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* This class is a version of Lucene's ReplicaFileDeleter class used to keep track of
* segment files that should be preserved on replicas between replication events.
* The difference is this component does not actually perform any deletions, it only handles refcounts.
* Our deletions are made through Store.java.
*
* https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
*
* @opensearch.internal
*/
final class ReplicaFileDeleter {

private final Map<String, Integer> refCounts = new HashMap<>();

public synchronized void incRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
refCounts.merge(fileName, 1, Integer::sum);
}
}

public synchronized void decRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
assert curCount != null : "fileName=" + fileName;
assert curCount > 0;
if (curCount == 1) {
refCounts.remove(fileName);
} else {
refCounts.put(fileName, curCount - 1);
}
}
}

public synchronized Integer getRefCount(String fileName) {
return refCounts.get(fileName);
}

public synchronized boolean skipDelete(String fileName) {
return refCounts.containsKey(fileName);
}
}
Loading

0 comments on commit 26477a8

Please sign in to comment.