Skip to content

Commit

Permalink
Add support to run SegRep integ tests using remote store settings (#7361
Browse files Browse the repository at this point in the history
)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored May 5, 2023
1 parent 63fbd0b commit 9bf99b4
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings.
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
* remote store.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
33 changes: 23 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -2234,6 +2236,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -2520,10 +2525,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
storeRecovery.recoverFromRemoteStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3324,14 +3329,7 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
final Repository remoteTranslogRepo;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
break;
case PEER:
try {
Expand Down Expand Up @@ -4406,6 +4404,9 @@ public void close() throws IOException {
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand Down Expand Up @@ -4439,6 +4440,18 @@ public void close() throws IOException {
onSettingsChanged();
}

private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
Expand Down
27 changes: 5 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard, repository);
recoverFromRemoteStore(indexShard);
return true;
});
} else {
Expand Down Expand Up @@ -441,7 +437,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -462,8 +458,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
}
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
indexShard.syncTranslogFilesFromRemoteTranslog();
} else {
bootstrap(indexShard, store);
}
Expand All @@ -482,19 +478,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool(),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ public Translog newTranslog(
primaryModeSupplier
);
}

public Repository getRepository() {
return repository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -116,8 +117,20 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location);
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2873,7 +2873,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(null, future);
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
Expand Down

0 comments on commit 9bf99b4

Please sign in to comment.