Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Refactor remote store within existing index shard store #7904

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
}
Expand Down
9 changes: 4 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,17 @@ public synchronized IndexShard createShard(
}
};

Store remoteStore = null;
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
this.indexSettings,
directory,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
Expand Down Expand Up @@ -508,7 +508,6 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteRefreshSegmentPressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
15 changes: 3 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

Expand All @@ -353,7 +352,6 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
Expand Down Expand Up @@ -444,7 +442,6 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}
Expand All @@ -457,10 +454,6 @@ public Store store() {
return this.store;
}

public Store remoteStore() {
return this.remoteStore;
}

/**
* Return the sort order of this index, or null if the index has no sort.
*/
Expand Down Expand Up @@ -3587,7 +3580,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}

private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
return (store.remoteDirectory() != null && shardRouting.primary());
}

public boolean isRemoteTranslogEnabled() {
Expand Down Expand Up @@ -4468,8 +4461,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert store.remoteDirectory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) store.remoteDirectory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
Expand All @@ -4481,7 +4474,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
store.incRef();
remoteStore.incRef();
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
try {
Expand Down Expand Up @@ -4538,7 +4530,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
logger.info("Downloaded segments: {}", downloadedSegments);
logger.info("Skipped download for segments: {}", skippedSegments);
store.decRef();
remoteStore.decRef();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public RemoteStoreRefreshListener(
) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.store().remoteDirectory())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to avoid doing all these casts? We're plumbing in the explicit concept of a remote directory into the store, so can it store and expose that directory as an explicit remote directory type to avoid the need for the consumers to unwrap and cast?

This really isn't following the principles of object oriented programming because even though the type is exposed as the a generic Directory, the consumers require this to be a very specific type (a FilterDirectory wrapping a FilterDirectory wrapping a RemoteSegmentStoreDirectory) or else it will fail at runtime. If the consumers can't be made agnostic to the specific type, then it would probably be better to be explicit about the type and let the compiler enforce it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think this is just a workaround as RemoteDirectory does not support all the methods in the directory. We need to get away from this and should be used as a drop-in replacement for any directory.

.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
localSegmentChecksumMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
}

private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
if (indexShard.store().remoteDirectory() == null) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
"Remote store is not enabled for this index",
Expand All @@ -450,7 +449,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
indexShard.prepareForIndexRecovery();
final Store store = indexShard.store();
store.incRef();
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);
Expand All @@ -474,7 +472,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e);
} finally {
store.decRef();
remoteStore.decRef();
}
}

Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final StoreDirectory directory;
private final StoreDirectory remoteDirectory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
Expand All @@ -200,11 +201,30 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) {
this(shardId, indexSettings, directory, null, shardLock, onClose);
}

public Store(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
Directory remoteDirectory,
ShardLock shardLock,
OnClose onClose
) {
super(shardId, indexSettings);
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));

if (remoteDirectory != null) {
ByteSizeCachingDirectory sizeCachingRemoteDir = new ByteSizeCachingDirectory(remoteDirectory, refreshInterval);
this.remoteDirectory = new StoreDirectory(sizeCachingRemoteDir, Loggers.getLogger("index.store.deletes", shardId));
} else {
this.remoteDirectory = null;
}

this.shardLock = shardLock;
this.onClose = onClose;
this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null;
Expand All @@ -219,6 +239,11 @@ public Directory directory() {
return directory;
}

public Directory remoteDirectory() {
ensureOpen();
return remoteDirectory;
}

/**
* Returns the last committed segments info for this store
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public void testRecoverFromNoOp() throws IOException {
initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE),
indexShard.indexSettings().getIndexMetadata(),
NoOpEngine::new,
new EngineConfigFactory(indexShard.indexSettings()),
null
new EngineConfigFactory(indexShard.indexSettings())
);
recoverShardFromStore(primary);
assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,8 +1236,7 @@ public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException {
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
RetentionLeaseSyncer.EMPTY
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
Expand Down Expand Up @@ -1266,8 +1265,7 @@ public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOExceptio
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
RetentionLeaseSyncer.EMPTY
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
Expand Down Expand Up @@ -1296,8 +1294,7 @@ public void testGlobalCheckpointSync() throws IOException {
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
RetentionLeaseSyncer.EMPTY
);
// add a replica
recoverShardFromStore(primaryShard);
Expand Down Expand Up @@ -1371,8 +1368,7 @@ public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
RetentionLeaseSyncer.EMPTY
);
recoverShardFromStore(primaryShard);
IndexShard replicaShard = newShard(shardId, false);
Expand Down Expand Up @@ -1782,8 +1778,7 @@ public Set<String> getPendingDeletions() throws IOException {
new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
shard.addShardFailureCallback((ig) -> failureCallbackTriggered.set(true));
Expand Down Expand Up @@ -2660,8 +2655,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
shard.getEngineConfigFactory(),
shard.getGlobalCheckpointSyncer(),
shard.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
Expand Down Expand Up @@ -2805,7 +2799,7 @@ public void testCommitLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(true);
}

public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
private void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
IndexShard target = newStartedShard(
true,
Settings.builder()
Expand Down Expand Up @@ -2857,7 +2851,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep

assertEquals(0, storeDirectory.listAll().length);

Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.remoteStore().directory()).getDelegate()).getDelegate();
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.store().remoteDirectory()).getDelegate()).getDelegate();

// extra0 file is added as a part of https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html
// Safe to remove without impacting the test
Expand All @@ -2867,14 +2861,12 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
}
}

target.remoteStore().incRef();
target = reinitShard(target, routing);

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
assertDocs(target, "1", "2");
Expand Down Expand Up @@ -2912,8 +2904,7 @@ public void testReaderWrapperIsUsed() throws IOException {
shard.getEngineConfigFactory(),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

recoverShardFromStore(newShard);
Expand Down Expand Up @@ -3061,8 +3052,7 @@ public void testSearchIsReleaseIfWrapperFails() throws IOException {
shard.getEngineConfigFactory(),
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

recoverShardFromStore(newShard);
Expand Down Expand Up @@ -3727,8 +3717,7 @@ private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPubl
() -> {},
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
checkpointPublisher,
null
checkpointPublisher
);
}

Expand Down Expand Up @@ -3784,8 +3773,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
indexShard.engineConfigFactory,
indexShard.getGlobalCheckpointSyncer(),
indexShard.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

final IndexShardRecoveryException indexShardRecoveryException = expectThrows(
Expand Down Expand Up @@ -3842,8 +3830,7 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
indexShard.engineConfigFactory,
indexShard.getGlobalCheckpointSyncer(),
indexShard.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

final IndexShardRecoveryException exception1 = expectThrows(
Expand Down Expand Up @@ -3877,8 +3864,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
indexShard.engineConfigFactory,
indexShard.getGlobalCheckpointSyncer(),
indexShard.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

final IndexShardRecoveryException exception2 = expectThrows(
Expand Down Expand Up @@ -3932,8 +3918,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
indexShard.engineConfigFactory,
indexShard.getGlobalCheckpointSyncer(),
indexShard.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
EMPTY_EVENT_LISTENER
);

Store.MetadataSnapshot storeFileMetadatas = newShard.snapshotStoreMetadata();
Expand Down Expand Up @@ -4677,8 +4662,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.
}
},
shard.getEngineConfigFactory(),
null
shard.getEngineConfigFactory()
);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null));
Expand Down
Loading