Skip to content

Commit

Permalink
[Remote Store] Inject remote store in IndexShard instead of RemoteSto…
Browse files Browse the repository at this point in the history
…reRefreshListener (#3703)

* Inject remote store in IndexShard instead of RemoteStoreRefreshListener

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Pass supplier of RepositoriesService to RemoteDirectoryFactory

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Create isRemoteStoreEnabled function for IndexShard

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Explicitly close remoteStore on indexShard close

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Change RemoteDirectory.close to a no-op

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale authored Jun 29, 2022
1 parent 47c191b commit 30209d8
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 80 deletions.
13 changes: 6 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -189,9 +187,9 @@ public final class IndexModule {
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.opensearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
*/
public IndexModule(
Expand Down Expand Up @@ -476,7 +474,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -519,7 +518,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
remoteDirectoryFactory,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
37 changes: 17 additions & 20 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -97,9 +96,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -437,8 +433,7 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -511,22 +506,24 @@ public synchronized IndexShard createShard(
warmer.warm(reader, shard, IndexService.this.indexSettings);
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
clusterService.state().metadata().clusterUUID(),
this.indexSettings,
path
);
remoteStore = new Store(
shardId,
this.indexSettings,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -557,7 +554,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
26 changes: 19 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -305,7 +307,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;
private final Store remoteStore;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -329,7 +331,7 @@ public IndexShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
@Nullable final Store remoteStore
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -417,7 +419,7 @@ public boolean shouldCache(Query query) {
} else {
this.checkpointRefreshListener = null;
}
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
this.remoteStore = remoteStore;
}

public ThreadPool getThreadPool() {
Expand All @@ -428,6 +430,10 @@ public Store store() {
return this.store;
}

public Store remoteStore() {
return this.remoteStore;
}

/**
* Return the sort order of this index, or null if the index has no sort.
*/
Expand Down Expand Up @@ -1638,7 +1644,8 @@ public void close(String reason, boolean flushEngine) throws IOException {
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
// Closing remoteStore as a part of IndexShard close. null check is handled by IOUtils
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, remoteStore);
indexShardOperationPermits.close();
}
}
Expand Down Expand Up @@ -3192,7 +3199,7 @@ private DocumentMapperForType docMapper() {
return mapperService.documentMapperWithAutoCreate();
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) throws IOException {
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
Expand All @@ -3204,8 +3211,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
if (isRemoteStoreEnabled()) {
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener.add(checkpointRefreshListener);
Expand Down Expand Up @@ -3238,6 +3246,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}

/**
* Closes the directory by deleting all the files in this directory
* Closes the remote directory. Currently, it is a no-op.
* If remote directory maintains a state in future, we need to clean it before closing the directory
*/
@Override
public void close() throws IOException {
blobContainer.delete();
// Do nothing
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Factory for a remote store directory
Expand All @@ -26,12 +29,23 @@
*/
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {

private final Supplier<RepositoriesService> repositoriesService;

public RemoteDirectoryFactory(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException {
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
}

}
17 changes: 8 additions & 9 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -265,6 +266,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final RemoteDirectoryFactory remoteDirectoryFactory;

@Override
protected void doStart() {
Expand Down Expand Up @@ -292,7 +294,8 @@ public IndicesService(
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
RemoteDirectoryFactory remoteDirectoryFactory
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -386,6 +389,7 @@ protected void closeInternal() {

this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand Down Expand Up @@ -745,7 +749,8 @@ private synchronized IndexService createIndexService(
indicesFieldDataCache,
namedWriteableRegistry,
this::isIdFieldDataEnabled,
valuesSourceRegistry
valuesSourceRegistry,
remoteDirectoryFactory
);
}

Expand Down Expand Up @@ -859,13 +864,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(
shardRouting,
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
repositoriesService
);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
import org.opensearch.Build;
Expand Down Expand Up @@ -622,6 +623,8 @@ protected Node(
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);

final RemoteDirectoryFactory remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceReference::get);

final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
Expand All @@ -642,7 +645,8 @@ protected Node(
engineFactoryProviders,
indexStoreFactories,
searchModule.getValuesSourceRegistry(),
recoveryStateFactories
recoveryStateFactories,
remoteDirectoryFactory
);

final AliasValidator aliasValidator = new AliasValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -74,13 +73,13 @@ interface DirectoryFactory {
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param repositoryName repository name
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
Expand Down
Loading

0 comments on commit 30209d8

Please sign in to comment.