From 5eae0a751928f4750f13e02b465160a2bc557c2e Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Fri, 2 Jun 2023 21:10:07 -0700 Subject: [PATCH] Refactor remote store within existing index shard store Signed-off-by: Kunal Kotwani --- .../opensearch/index/shard/IndexShardIT.java | 1 - .../org/opensearch/index/IndexService.java | 9 +-- .../opensearch/index/shard/IndexShard.java | 15 +--- .../shard/RemoteStoreRefreshListener.java | 2 +- .../opensearch/index/shard/StoreRecovery.java | 5 +- .../org/opensearch/index/store/Store.java | 25 +++++++ .../index/engine/NoOpEngineRecoveryTests.java | 3 +- .../index/shard/IndexShardTests.java | 48 ++++-------- .../RemoteStoreRefreshListenerTests.java | 33 +++++---- .../RemoveCorruptedShardDataCommandTests.java | 6 +- ...overyWithRemoteTranslogOnPrimaryTests.java | 3 +- .../PeerRecoveryTargetServiceTests.java | 3 +- .../BlobStoreRepositoryRestoreTests.java | 3 +- ...enSearchIndexLevelReplicationTestCase.java | 25 +------ .../index/shard/IndexShardTestCase.java | 74 ++++++++++--------- 15 files changed, 115 insertions(+), 140 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index ba567c125c6e9..11f187ac6e619 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -701,7 +701,6 @@ public static final IndexShard newIndexShard( cbs, (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, - null, null ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 4e808ebb838e7..8a1fc7256274f 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -470,17 +470,17 @@ public synchronized IndexShard createShard( } }; - Store remoteStore = null; + Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory remoteDirectory = null; if (this.indexSettings.isRemoteStoreEnabled()) { - Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); - remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); } - Directory directory = directoryFactory.newDirectory(this.indexSettings, path); store = new Store( shardId, this.indexSettings, directory, + remoteDirectory, lock, new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)) ); @@ -508,7 +508,6 @@ public synchronized IndexShard createShard( circuitBreakerService, translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore, remoteRefreshSegmentPressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2ec0d186bf056..c01b7215227d6 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -326,7 +326,6 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; - private final Store remoteStore; private final BiFunction translogFactorySupplier; private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; @@ -353,7 +352,6 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore, final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); @@ -444,7 +442,6 @@ public boolean shouldCache(Query query) { this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); this.checkpointPublisher = checkpointPublisher; - this.remoteStore = remoteStore; this.translogFactorySupplier = translogFactorySupplier; this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } @@ -457,10 +454,6 @@ public Store store() { return this.store; } - public Store remoteStore() { - return this.remoteStore; - } - /** * Return the sort order of this index, or null if the index has no sort. */ @@ -3587,7 +3580,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro } private boolean isRemoteStoreEnabled() { - return (remoteStore != null && shardRouting.primary()); + return (store.remoteDirectory() != null && shardRouting.primary()); } public boolean isRemoteTranslogEnabled() { @@ -4468,8 +4461,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); - assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert store.remoteDirectory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) store.remoteDirectory(); assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory : "Store.directory is not enclosing an instance of FilterDirectory"; FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); @@ -4481,7 +4474,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) .getSegmentsUploadedToRemoteStore(); store.incRef(); - remoteStore.incRef(); List downloadedSegments = new ArrayList<>(); List skippedSegments = new ArrayList<>(); try { @@ -4538,7 +4530,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re logger.info("Downloaded segments: {}", downloadedSegments); logger.info("Skipped download for segments: {}", skippedSegments); store.decRef(); - remoteStore.decRef(); } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 88b71a92d7340..4b61344452c7d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -119,7 +119,7 @@ public RemoteStoreRefreshListener( ) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); - this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) + this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.store().remoteDirectory()) .getDelegate()).getDelegate(); this.primaryTerm = indexShard.getOperationPrimaryTerm(); localSegmentChecksumMap = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 9393a5ac38ac2..17fe4bf785d64 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -438,8 +438,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi } private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { - final Store remoteStore = indexShard.remoteStore(); - if (remoteStore == null) { + if (indexShard.store().remoteDirectory() == null) { throw new IndexShardRecoveryException( indexShard.shardId(), "Remote store is not enabled for this index", @@ -450,7 +449,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco indexShard.prepareForIndexRecovery(); final Store store = indexShard.store(); store.incRef(); - remoteStore.incRef(); try { // Download segments from remote segment store indexShard.syncSegmentsFromRemoteSegmentStore(true); @@ -474,7 +472,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e); } finally { store.decRef(); - remoteStore.decRef(); } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index dae698b1c3b46..060ec6c6de9c9 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -179,6 +179,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final AtomicBoolean isClosed = new AtomicBoolean(false); private final StoreDirectory directory; + private final StoreDirectory remoteDirectory; private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); private final ShardLock shardLock; private final OnClose onClose; @@ -200,11 +201,30 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, } public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) { + this(shardId, indexSettings, directory, null, shardLock, onClose); + } + + public Store( + ShardId shardId, + IndexSettings indexSettings, + Directory directory, + Directory remoteDirectory, + ShardLock shardLock, + OnClose onClose + ) { super(shardId, indexSettings); final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval); this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); + + if (remoteDirectory != null) { + ByteSizeCachingDirectory sizeCachingRemoteDir = new ByteSizeCachingDirectory(remoteDirectory, refreshInterval); + this.remoteDirectory = new StoreDirectory(sizeCachingRemoteDir, Loggers.getLogger("index.store.deletes", shardId)); + } else { + this.remoteDirectory = null; + } + this.shardLock = shardLock; this.onClose = onClose; this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null; @@ -219,6 +239,11 @@ public Directory directory() { return directory; } + public Directory remoteDirectory() { + ensureOpen(); + return remoteDirectory; + } + /** * Returns the last committed segments info for this store * diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java index 3162f7915c994..4a895e1efc152 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java @@ -58,8 +58,7 @@ public void testRecoverFromNoOp() throws IOException { initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), indexShard.indexSettings().getIndexMetadata(), NoOpEngine::new, - new EngineConfigFactory(indexShard.indexSettings()), - null + new EngineConfigFactory(indexShard.indexSettings()) ); recoverShardFromStore(primary); assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes()); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 5253a017f8e0e..ef6f32c2a8fbb 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1236,8 +1236,7 @@ public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException { null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY, - null + RetentionLeaseSyncer.EMPTY ); expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); closeShard(primaryShard, false); @@ -1266,8 +1265,7 @@ public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOExceptio null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY, - null + RetentionLeaseSyncer.EMPTY ); expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); closeShard(primaryShard, false); @@ -1296,8 +1294,7 @@ public void testGlobalCheckpointSync() throws IOException { null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY, - null + RetentionLeaseSyncer.EMPTY ); // add a replica recoverShardFromStore(primaryShard); @@ -1371,8 +1368,7 @@ public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception { null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY, - null + RetentionLeaseSyncer.EMPTY ); recoverShardFromStore(primaryShard); IndexShard replicaShard = newShard(shardId, false); @@ -1782,8 +1778,7 @@ public Set getPendingDeletions() throws IOException { new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); shard.addShardFailureCallback((ig) -> failureCallbackTriggered.set(true)); @@ -2660,8 +2655,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { shard.getEngineConfigFactory(), shard.getGlobalCheckpointSyncer(), shard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); @@ -2805,7 +2799,7 @@ public void testCommitLevelRestoreShardFromRemoteStore() throws IOException { testRestoreShardFromRemoteStore(true); } - public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException { + private void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException { IndexShard target = newStartedShard( true, Settings.builder() @@ -2857,7 +2851,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep assertEquals(0, storeDirectory.listAll().length); - Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.remoteStore().directory()).getDelegate()).getDelegate(); + Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.store().remoteDirectory()).getDelegate()).getDelegate(); // extra0 file is added as a part of https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html // Safe to remove without impacting the test @@ -2867,14 +2861,12 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep } } - target.remoteStore().incRef(); target = reinitShard(target, routing); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); final PlainActionFuture future = PlainActionFuture.newFuture(); target.restoreFromRemoteStore(future); - target.remoteStore().decRef(); assertTrue(future.actionGet()); assertDocs(target, "1", "2"); @@ -2912,8 +2904,7 @@ public void testReaderWrapperIsUsed() throws IOException { shard.getEngineConfigFactory(), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); recoverShardFromStore(newShard); @@ -3061,8 +3052,7 @@ public void testSearchIsReleaseIfWrapperFails() throws IOException { shard.getEngineConfigFactory(), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); recoverShardFromStore(newShard); @@ -3727,8 +3717,7 @@ private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPubl () -> {}, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, - checkpointPublisher, - null + checkpointPublisher ); } @@ -3784,8 +3773,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); final IndexShardRecoveryException indexShardRecoveryException = expectThrows( @@ -3842,8 +3830,7 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception { indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); final IndexShardRecoveryException exception1 = expectThrows( @@ -3877,8 +3864,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); final IndexShardRecoveryException exception2 = expectThrows( @@ -3932,8 +3918,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); Store.MetadataSnapshot storeFileMetadatas = newShard.snapshotStoreMetadata(); @@ -4677,8 +4662,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { // just like a following shard, we need to skip this check for now. } }, - shard.getEngineConfigFactory(), - null + shard.getEngineConfigFactory() ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null)); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 365fb0237f80f..4c5588190a4a3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -94,9 +94,10 @@ public void testAfterRefresh() throws IOException { setup(true, 3); assertDocs(indexShard, "1", "2", "3"); - try (Store remoteStore = indexShard.remoteStore()) { + try (Store remoteStore = indexShard.store()) { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.remoteDirectory()).getDelegate()) + .getDelegate(); verifyUploadedSegments(remoteSegmentStoreDirectory); @@ -112,9 +113,10 @@ public void testAfterCommit() throws IOException { assertDocs(indexShard, "1", "2", "3"); flushShard(indexShard); - try (Store remoteStore = indexShard.remoteStore()) { + try (Store remoteStore = indexShard.store()) { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.remoteDirectory()).getDelegate()) + .getDelegate(); verifyUploadedSegments(remoteSegmentStoreDirectory); @@ -136,9 +138,10 @@ public void testRefreshAfterCommit() throws IOException { indexDocs(8, 4); indexShard.refresh("test"); - try (Store remoteStore = indexShard.remoteStore()) { + try (Store remoteStore = indexShard.store()) { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.remoteDirectory()).getDelegate()) + .getDelegate(); verifyUploadedSegments(remoteSegmentStoreDirectory); @@ -158,9 +161,10 @@ public void testAfterMultipleCommits() throws IOException { flushShard(indexShard); } - try (Store remoteStore = indexShard.remoteStore()) { + try (Store remoteStore = indexShard.store()) { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.remoteDirectory()).getDelegate()) + .getDelegate(); verifyUploadedSegments(remoteSegmentStoreDirectory); @@ -175,9 +179,10 @@ public void testReplica() throws IOException { setup(false, 3); remoteStoreRefreshListener.afterRefresh(true); - try (Store remoteStore = indexShard.remoteStore()) { + try (Store remoteStore = indexShard.store()) { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.remoteDirectory()).getDelegate()) + .getDelegate(); assertEquals(0, remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size()); } @@ -188,7 +193,7 @@ public void testReplicaPromotion() throws IOException, InterruptedException { remoteStoreRefreshListener.afterRefresh(true); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.store().remoteDirectory()).getDelegate()) .getDelegate(); assertEquals(0, remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size()); @@ -359,13 +364,11 @@ private Tuple m when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) - Store remoteStore = mock(Store.class); - when(shard.remoteStore()).thenReturn(remoteStore); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.store().remoteDirectory()).getDelegate()) .getDelegate(); FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); - when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + when(store.remoteDirectory()).thenReturn(remoteStoreFilterDirectory); // Mock indexShard.getOperationPrimaryTerm() when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java index a6ecb7053f286..32ded73c8e9a6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -174,8 +174,7 @@ public void setup() throws IOException { new EngineConfigFactory(new IndexSettings(indexMetadata, settings)), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ), true ); @@ -551,8 +550,7 @@ private IndexShard reopenIndexShard(boolean corrupted) throws IOException { indexShard.getEngineConfigFactory(), indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); } diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index d76afca51e354..cf5d1180d495a 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -79,8 +79,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception { replica.getEngineConfigFactory(), replica.getGlobalCheckpointSyncer(), replica.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); shards.addReplica(newReplicaShard); AtomicBoolean assertDone = new AtomicBoolean(false); diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 730d9b4215b73..d4a5596fec55d 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -321,8 +321,7 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception { ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE), indexMetadata, NoOpEngine::new, - new EngineConfigFactory(shard.indexSettings()), - null + new EngineConfigFactory(shard.indexSettings()) ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 63b144dae9c93..ac3840d12700c 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -140,8 +140,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { new EngineConfigFactory(shard.indexSettings()), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); // restore the shard diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index e349fbdd6f1fc..4ab2a04d1b2ea 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -98,7 +98,6 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -249,19 +248,7 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - Store remoteStore = null; - if (remotePath != null) { - remoteStore = createRemoteStore(remotePath, primaryRouting, indexMetadata); - } - primary = newShard( - primaryRouting, - indexMetadata, - null, - getEngineFactory(primaryRouting), - () -> {}, - retentionLeaseSyncer, - remoteStore - ); + primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; updateAllocationIDsOnPrimary(); @@ -386,18 +373,13 @@ public IndexShard addReplica() throws IOException { public IndexShard addReplica(Path remotePath) throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); - Store remoteStore = null; - if (remotePath != null) { - remoteStore = createRemoteStore(remotePath, replicaRouting, indexMetadata); - } final IndexShard replica = newShard( replicaRouting, indexMetadata, null, getEngineFactory(replicaRouting), () -> {}, - retentionLeaseSyncer, - remoteStore + retentionLeaseSyncer ); addReplica(replica); return replica; @@ -438,8 +420,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP getEngineConfigFactory(new IndexSettings(indexMetadata, indexMetadata.getSettings())), () -> {}, retentionLeaseSyncer, - EMPTY_EVENT_LISTENER, - null + EMPTY_EVENT_LISTENER ); replicas.add(newReplica); if (replicationTargets != null) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b1dd4fb1dcc1e..12baa895f2622 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -208,6 +208,7 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool protected ThreadPool threadPool; protected long primaryTerm; protected ClusterService clusterService; + protected Path remoteStorePath; @Override public void setUp() throws Exception { @@ -216,6 +217,7 @@ public void setUp() throws Exception { primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards clusterService = createClusterService(threadPool); failOnShardFailures(); + remoteStorePath = createTempDir(); } protected ThreadPool setUpThreadPool() { @@ -261,6 +263,34 @@ protected Store createStore(ShardId shardId, IndexSettings indexSettings, Direct return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); } + protected Store createStoreWithRemoteDirectory(Path path, ShardRouting shardRouting, IndexMetadata metadata, ShardPath shardPath) + throws IOException { + Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); + ShardId shardId = shardPath.getShardId(); + NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); + ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); + RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + + RemoteSegmentStoreDirectory remoteDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, null); + + return new Store( + shardId, + new IndexSettings(metadata, nodeSettings), + newFSDirectory(shardPath.resolveIndex()), + remoteDirectory, + new DummyShardLock(shardId), + Store.OnClose.EMPTY + ); + } + + private RemoteDirectory newRemoteDirectory(Path f) throws IOException { + FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); + BlobPath blobPath = new BlobPath(); + BlobContainer fsBlobContainer = new FsBlobContainer(fsBlobStore, blobPath, f); + return new RemoteDirectory(fsBlobContainer); + } + protected Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); @@ -344,7 +374,7 @@ protected IndexShard newShard( .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("{ \"properties\": {} }"); - return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); + return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); } /** @@ -413,8 +443,7 @@ protected IndexShard newShard( readerWrapper, new InternalEngineFactory(), globalCheckpointSyncer, - RetentionLeaseSyncer.EMPTY, - null + RetentionLeaseSyncer.EMPTY ); } @@ -433,7 +462,7 @@ protected IndexShard newShard( EngineFactory engineFactory, IndexingOperationListener... listeners ) throws IOException { - return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); + return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); } /** @@ -452,7 +481,6 @@ protected IndexShard newShard( @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, - Store remoteStore, IndexingOperationListener... listeners ) throws IOException { // add node id as name to settings for proper logging @@ -470,7 +498,6 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, EMPTY_EVENT_LISTENER, - remoteStore, listeners ); } @@ -498,7 +525,6 @@ protected IndexShard newShard( Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, - Store remoteStore, IndexingOperationListener... listeners ) throws IOException { return newShard( @@ -513,7 +539,6 @@ protected IndexShard newShard( retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, - remoteStore, listeners ); } @@ -542,14 +567,17 @@ protected IndexShard newShard( RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable Store remoteStore, IndexingOperationListener... listeners ) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings); final IndexShard indexShard; if (storeProvider == null) { - storeProvider = is -> createStore(is, shardPath); + if (indexSettings.isRemoteStoreEnabled()) { + storeProvider = __ -> createStoreWithRemoteDirectory(remoteStorePath, routing, indexMetadata, shardPath); + } else { + storeProvider = is -> createStore(is, shardPath); + } } final Store store = storeProvider.apply(indexSettings); boolean success = false; @@ -573,9 +601,6 @@ protected IndexShard newShard( RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; if (indexSettings.isRemoteStoreEnabled()) { - if (remoteStore == null) { - remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); - } remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); } @@ -612,7 +637,6 @@ protected IndexShard newShard( breakerService, translogFactorySupplier, checkpointPublisher, - remoteStore, remoteRefreshSegmentPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); @@ -640,25 +664,6 @@ protected RepositoriesService createRepositoriesService() { return repositoriesService; } - protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { - Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); - - ShardId shardId = new ShardId("index", "_na_", 0); - NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); - ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); - RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, null); - return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); - } - - private RemoteDirectory newRemoteDirectory(Path f) throws IOException { - FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); - BlobPath blobPath = new BlobPath(); - BlobContainer fsBlobContainer = new FsBlobContainer(fsBlobStore, blobPath, f); - return new RemoteDirectory(fsBlobContainer); - } - /** * Takes an existing shard, closes it and starts a new initialing shard at the same location * @@ -689,7 +694,6 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index current.indexSettings.getIndexMetadata(), current.engineFactory, current.engineConfigFactory, - current.remoteStore(), listeners ); } @@ -708,7 +712,6 @@ protected IndexShard reinitShard( IndexMetadata indexMetadata, EngineFactory engineFactory, EngineConfigFactory engineConfigFactory, - Store remoteStore, IndexingOperationListener... listeners ) throws IOException { closeShards(current); @@ -723,7 +726,6 @@ protected IndexShard reinitShard( current.getGlobalCheckpointSyncer(), current.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, - remoteStore, listeners ); }