Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Integrate remote segment store in peer recovery flow #6664

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Remote Store] Integrate remote segment store in peer recovery flow ([#6664](https://github.com/opensearch-project/OpenSearch/pull/6664))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -109,4 +110,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -30,6 +33,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -207,4 +212,93 @@ public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), true);
}

private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

refresh(INDEX_NAME);
String replicaNodeName = replicaNodeName(INDEX_NAME);
assertBusy(
() -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)),
30,
TimeUnit.SECONDS
);

RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get();

Optional<RecoveryState> recoverySource = recoveryResponse.shardRecoveryStates()
.get(INDEX_NAME)
.stream()
.filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER)
.findFirst();
assertFalse(recoverySource.isEmpty());
if (numberOfIterations == 1 && invokeFlush) {
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());
} else {
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}

IndexResponse response = indexSingleDoc();
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1),
30,
TimeUnit.SECONDS
);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(false, 1, true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(false, 1, false);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(true, 1, true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(true, 1, false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), false);
}
}
15 changes: 13 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4371,11 +4371,22 @@ public void close() throws IOException {
}

/**
* Downloads segments from remote segment store.
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
Comment on lines +4381 to +4390
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can sync segments be agnostic of the source like peer node or remote store?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks quite challenging to me as the code paths are completely different . Would like to know @sachinpkale's view as well .

Also the paths are different in other parts like restore as well . So there is consistency across all parts .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @gbbafna mentioned, this method is not specific to peer recovery. We are using the same method in restore as well as failover flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this change, we make sure that segments are downloaded from the remote store instead of the source node. Remote store is not replacing the source node. I have not given much thought to this yet but given the peer recovery flow, it would be much bigger change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name itself doesn't provide an abstraction. Lets open a follow up issue to clean this up ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name itself doesn't provide an abstraction. Lets open a follow up issue to clean this up ?

Okay, will create a tracking issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue: #6831

assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4414,7 +4425,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
skippedSegments.add(file);
}
}
if (segmentInfosSnapshotFilename != null) {
if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,13 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false;
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false;
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2449,4 +2449,11 @@ protected String primaryNodeName(String indexName) {
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId();
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

protected String replicaNodeName(String indexName) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).replicaShards().get(0).currentNodeId();
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

}