Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add PIT/Scroll compatibility with Segment Replication #6644

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -109,4 +110,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,21 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.PitTestsUtil;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -29,15 +42,24 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.node.NodeClosedException;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -46,14 +68,20 @@
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static java.util.Arrays.asList;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.action.search.SearchContextId.decode;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

Expand Down Expand Up @@ -835,4 +863,207 @@ public void testSegmentReplicationStats() throws Exception {
});
}
}

public void testScrollCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);
SearchResponse searchResponse = client(replica).prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setPreference("_only_local")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.addSort("field", SortOrder.ASC)
.setSize(10)
.setScroll(TimeValue.timeValueDays(1))
.get();

flush(INDEX_NAME);

for (int i = 3; i < 50; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
long numHits = 0;
do {
numHits += searchResponse.getHits().getHits().length;
searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get();
assertAllSuccessful(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
for (int i = 3; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
// wait until replication finishes, then make the pit request.
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setPreference("_only_local");
request.setIndices(new String[] { INDEX_NAME });
ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.setRequestCache(false)
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);

// fetch the segments snapshotted when the reader context was created.
Collection<String> snapshottedSegments;
SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
);
try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
(OpenSearchDirectoryReader) searcher.getDirectoryReader()
);
final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
snapshottedSegments = infos.files(true);
}
;

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(INDEX_NAME);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
assertEquals(1, pitCurrent);
assertEquals(1, openContexts);
SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime());
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

// delete the PIT
DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be cleaned up", currentFiles.containsAll(snapshottedSegments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
Expand Down Expand Up @@ -119,6 +119,23 @@ public void onAfterTranslogSync() {
}
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decrefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint("On reader closed", getLatestSegmentInfos());
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
);
}

@Override
public TranslogManager translogManager() {
return translogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
Expand All @@ -35,17 +37,27 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {

private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
private Consumer<Collection<String>> onReaderClosed;
private Consumer<Collection<String>> onNewReader;

/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
* @param reader - The SegmentReplicationReaderManager to use for future reopens.
* @param onNewReader - Called when a new reader is created.
* @param onReaderClosed - Called when a reader is closed.
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
NRTReplicationReaderManager(
OpenSearchDirectoryReader reader,
Consumer<Collection<String>> onNewReader,
Consumer<Collection<String>> onReaderClosed
) {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onNewReader = onNewReader;
this.onReaderClosed = onReaderClosed;
}

@Override
Expand All @@ -60,6 +72,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
final Collection<String> files = currentInfos.files(true);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Expand All @@ -68,7 +81,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(
softDeletesDirectoryReaderWrapper,
referenceToRefresh.shardId()
);
onNewReader.accept(files);
OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
return reader;
}

/**
Expand All @@ -89,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
return currentInfos;
}

private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
Expand Down
Loading