Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Refactor local and remote store to a composite store architecture #7960

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.shard.RemoveCorruptedShardDataCommandIT;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.CompositeStore;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale
});

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
try (Store store = new Store(shardId, indexSettings, new NIOFSDirectory(indexPath), new DummyShardLock(shardId))) {
try (Store store = new CompositeStore(shardId, indexSettings, new NIOFSDirectory(indexPath), new DummyShardLock(shardId))) {
store.removeCorruptionMarker();
}
node1 = internalCluster().startNode(node1DataPathSettings);
Expand Down Expand Up @@ -232,11 +233,11 @@ private String historyUUID(String node, String indexName) {
}

private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException {
try (Store store = new Store(shardId, indexSettings, new NIOFSDirectory(indexPath), new DummyShardLock(shardId))) {
try (Store store = new CompositeStore(shardId, indexSettings, new NIOFSDirectory(indexPath), new DummyShardLock(shardId))) {
store.markStoreCorrupted(new IOException("fake ioexception"));
}
}

// Test
private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
assertBusy(() -> {
final ClusterAllocationExplanation explanation = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.CompositeStore;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -682,7 +683,7 @@ public static final IndexShard newIndexShard(
initializingShardRouting,
indexService.getIndexSettings(),
shard.shardPath(),
shard.store(),
(CompositeStore) shard.store(),
indexService.getIndexSortSupplier(),
indexService.cache(),
indexService.mapperService(),
Expand All @@ -701,7 +702,6 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
}
Expand Down
25 changes: 16 additions & 9 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeStore;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -454,7 +455,7 @@ public synchronized IndexShard createShard(
final Settings indexSettings = this.indexSettings.getSettings();
final ShardId shardId = routing.shardId();
boolean success = false;
Store store = null;
CompositeStore store = null;
IndexShard indexShard = null;
ShardLock lock = null;
try {
Expand All @@ -470,20 +471,27 @@ public synchronized IndexShard createShard(
}
};

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
store = new CompositeStore(
shardId,
this.indexSettings,
directory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);

if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
store = new CompositeStore(
shardId,
this.indexSettings,
directory,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
}

eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
routing,
Expand All @@ -508,7 +516,6 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteRefreshSegmentPressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeStore;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.Store.MetadataSnapshot;
Expand Down Expand Up @@ -241,7 +242,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final ThreadPool threadPool;
private final MapperService mapperService;
private final IndexCache indexCache;
private final Store store;
private final CompositeStore store;
private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchStats = new ShardSearchStats();
private final ShardGetService getService;
Expand Down Expand Up @@ -331,15 +332,14 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
final ShardPath path,
final Store store,
final CompositeStore store,
final Supplier<Sort> indexSortSupplier,
final IndexCache indexCache,
final MapperService mapperService,
Expand All @@ -358,7 +358,6 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
Expand Down Expand Up @@ -449,7 +448,6 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}
Expand All @@ -463,7 +461,7 @@ public Store store() {
}

public Store remoteStore() {
return this.remoteStore;
return this.store.remoteStore();
}

/**
Expand Down Expand Up @@ -3632,7 +3630,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}

private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
return (remoteStore() != null && shardRouting.primary());
}

public boolean isRemoteTranslogEnabled() {
Expand Down Expand Up @@ -4505,6 +4503,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
Store remoteStore = remoteStore();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
Expand Down
Loading