Skip to content

Commit

Permalink
Add BlockIOContext to support block level fetch in openInput for Remo…
Browse files Browse the repository at this point in the history
…teSegmentStoreDirectory, added javadocs and removed BaseRemoteSegmentStoreDirectory

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Apr 10, 2024
1 parent 2854383 commit 958d4f5
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1636,8 +1636,8 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster
}

public static void validateIndexStoreLocality(Settings indexSettings) {
if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.LocalityType.FULL.toString())
.equalsIgnoreCase(IndexModule.LocalityType.PARTIAL.toString())
if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.toString())
.equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString())
&& !FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING)) {
throw new IllegalArgumentException(
"index.store.locality can be set to PARTIAL only if Feature Flag for Writable Remote Index is true"
Expand Down
31 changes: 20 additions & 11 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public final class IndexModule {
/**
* Index setting which used to determine how the data is cached locally fully or partially
*/
public static final Setting<LocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
"index.store.locality",
LocalityType.FULL.name(),
LocalityType::getValueOf,
public static final Setting<DataLocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
"index.store.data_locality",
DataLocalityType.FULL.name(),
DataLocalityType::getValueOf,
Property.IndexScope,
Property.NodeScope
);
Expand Down Expand Up @@ -593,24 +593,33 @@ public boolean match(Settings settings) {
}
}

public enum LocalityType {
/**
* Indicates the locality of the data - whether it will be cached fully or partially
*/
public enum DataLocalityType {
/**
* Indicates that all the data will be cached locally
*/
FULL,
/**
* Indicates that only a subset of the data will be cached locally
*/
PARTIAL;

private static final Map<String, LocalityType> LOCALITY_TYPES;
private static final Map<String, DataLocalityType> LOCALITY_TYPES;

static {
final Map<String, LocalityType> localityTypes = new HashMap<>(values().length);
for (final LocalityType localityType : values()) {
localityTypes.put(localityType.name(), localityType);
final Map<String, DataLocalityType> localityTypes = new HashMap<>(values().length);
for (final DataLocalityType dataLocalityType : values()) {
localityTypes.put(dataLocalityType.name(), dataLocalityType);
}
LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes);
}

public static LocalityType getValueOf(final String localityType) {
public static DataLocalityType getValueOf(final String localityType) {
Objects.requireNonNull(localityType, "No locality type given.");
final String localityTypeName = toRootUpperCase(localityType.trim());
final LocalityType type = LOCALITY_TYPES.get(localityTypeName);
final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName);
if (type != null) {
return type;
}
Expand Down
12 changes: 3 additions & 9 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.BaseRemoteSegmentStoreDirectory;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -534,14 +533,9 @@ public synchronized IndexShard createShard(
* Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache
* TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion
*/
assert directoryFactory instanceof FsDirectoryFactory
: "For Composite Directory, local directory must be of type FSDirectory";
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
directory = new CompositeDirectory(
(FSDirectory) localDirectory,
(BaseRemoteSegmentStoreDirectory) remoteDirectory,
fileCache
);
assert localDirectory instanceof FSDirectory : "For Composite Directory, local directory must be of type FSDirectory";
directory = new CompositeDirectory((FSDirectory) localDirectory, (RemoteSegmentStoreDirectory) remoteDirectory, fileCache);
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
}
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -913,8 +913,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings);
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isStoreLocalityPartial = settings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.LocalityType.FULL.toString())
.equalsIgnoreCase(IndexModule.LocalityType.PARTIAL.toString());
isStoreLocalityPartial = settings.get(
IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(),
IndexModule.DataLocalityType.FULL.toString()
).equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString());
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.opensearch.index.store.remote.file.OnDemandCompositeBlockIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileType;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand All @@ -30,22 +32,40 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
* Composite Directory will contain both local and remote directory
* Consumers of Composite directory need not worry whether file is in local or remote
* All such abstractions will be handled by the Composite directory itself
* Implements all required methods by Directory abstraction
*/
public class CompositeDirectory extends FilterDirectory {
private static final Logger logger = LogManager.getLogger(CompositeDirectory.class);
private final FSDirectory localDirectory;
private final BaseRemoteSegmentStoreDirectory remoteDirectory;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final FileCache fileCache;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();

public CompositeDirectory(FSDirectory localDirectory, BaseRemoteSegmentStoreDirectory remoteDirectory, FileCache fileCache) {
/**
* Constructor to initialise the composite directory
* @param localDirectory corresponding to the local FSDirectory
* @param remoteDirectory corresponding to the remote directory
* @param fileCache used to cache the remote files locally
*/
public CompositeDirectory(FSDirectory localDirectory, RemoteSegmentStoreDirectory remoteDirectory, FileCache fileCache) {
super(localDirectory);
this.localDirectory = localDirectory;
this.remoteDirectory = remoteDirectory;
this.fileCache = fileCache;
}

/**
* Returns names of all files stored in this directory in sorted order
* Does not include locally stored block files (having _block_ in their names)
*
* @throws IOException in case of I/O error
*/
@Override
public String[] listAll() throws IOException {
logger.trace("listAll() called ...");
Expand All @@ -72,25 +92,37 @@ public String[] listAll() throws IOException {
}
}

/**
* Removes an existing file in the directory.
* Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well
* @param name the name of an existing file.
* @throws IOException in case of I/O error
*/
@Override
public void deleteFile(String name) throws IOException {
logger.trace("deleteFile() called {}", name);
writeLock.lock();
try {
localDirectory.deleteFile(name);
fileCache.remove(localDirectory.getDirectory().resolve(name));
} catch (NoSuchFileException e) {
/**
* We might encounter NoSuchFileException in case file is deleted from local
* But if it is present in remote we should just skip deleting this file
* TODO : Handle cases where file is not present in remote as well
*/
logger.trace("NoSuchFileException encountered while deleting {} from local", name);
} catch (NoSuchFileException | FileNotFoundException e) {
logger.trace("File {} not found in local, trying to delete from Remote", name);
try {
remoteDirectory.deleteFile(name);
} finally {
writeLock.unlock();
}
} finally {
writeLock.unlock();
}
}

/**
* Returns the byte length of a file in the directory.
* Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well
* @param name the name of an existing file.
* @throws IOException in case of I/O error
*/
@Override
public long fileLength(String name) throws IOException {
logger.trace("fileLength() called {}", name);
Expand All @@ -110,6 +142,12 @@ public long fileLength(String name) throws IOException {
}
}

/**
* Creates a new, empty file in the directory and returns an {@link IndexOutput} instance for
* appending data to this file.
* @param name the name of the file to create.
* @throws IOException in case of I/O error
*/
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
logger.trace("createOutput() called {}", name);
Expand All @@ -121,6 +159,13 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
}
}

/**
* Creates a new, empty, temporary file in the directory and returns an {@link IndexOutput}
* instance for appending data to this file.
*
* <p>The temporary file name (accessible via {@link IndexOutput#getName()}) will start with
* {@code prefix}, end with {@code suffix} and have a reserved file extension {@code .tmp}.
*/
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
logger.trace("createTempOutput() called {} , {}", prefix, suffix);
Expand All @@ -132,6 +177,10 @@ public IndexOutput createTempOutput(String prefix, String suffix, IOContext cont
}
}

/**
* Ensures that any writes to these files are moved to stable storage (made durable).
* @throws IOException in case of I/O error
*/
@Override
public void sync(Collection<String> names) throws IOException {
logger.trace("sync() called {}", names);
Expand All @@ -149,6 +198,10 @@ public void sync(Collection<String> names) throws IOException {
}
}

/**
* Ensures that directory metadata, such as recent file renames, are moved to stable storage.
* @throws IOException in case of I/O error
*/
@Override
public void syncMetaData() throws IOException {
logger.trace("syncMetaData() called ");
Expand All @@ -160,6 +213,11 @@ public void syncMetaData() throws IOException {
}
}

/**
* Renames {@code source} file to {@code dest} file where {@code dest} must not already exist in
* the directory.
* @throws IOException in case of I/O error
*/
@Override
public void rename(String source, String dest) throws IOException {
logger.trace("rename() called {}, {}", source, dest);
Expand All @@ -171,6 +229,12 @@ public void rename(String source, String dest) throws IOException {
}
}

/**
* Opens a stream for reading an existing file.
* Check whether the file is present locally or in remote and return the IndexInput accordingly
* @param name the name of an existing file.
* @throws IOException in case of I/O error
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
logger.trace("openInput() called {}", name);
Expand All @@ -191,6 +255,13 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
}

/**
* Acquires and returns a {@link Lock} for a file with the given name.
* @param name the name of the lock file
* @throws LockObtainFailedException (optional specific exception) if the lock could not be
* obtained because it is currently held elsewhere.
* @throws IOException in case of I/O error
*/
@Override
public Lock obtainLock(String name) throws IOException {
logger.trace("obtainLock() called {}", name);
Expand All @@ -202,6 +273,10 @@ public Lock obtainLock(String name) throws IOException {
}
}

/**
* Closes the directory
* @throws IOException in case of I/O error
*/
@Override
public void close() throws IOException {
writeLock.lock();
Expand All @@ -212,6 +287,10 @@ public void close() throws IOException {
}
}

/**
* Returns a set of files currently pending deletion in this directory.
* @throws IOException in case of I/O error
*/
@Override
public Set<String> getPendingDeletions() throws IOException {
readLock.lock();
Expand All @@ -224,9 +303,9 @@ public Set<String> getPendingDeletions() throws IOException {

/**
* Function to perform operations once files have been uploaded to Remote Store
* Once uploaded local files are safe to delete
* @param files : files which have been successfully uploaded to Remote Store
* @throws IOException
* Currently deleting the local files here, as once uploaded to Remote, local files are safe to delete
* @param files : recent files which have been successfully uploaded to Remote Store
* @throws IOException in case of I/O error
*/
public void afterSyncToRemote(Collection<String> files) throws IOException {
logger.trace("afterSyncToRemote called for {}", files);
Expand All @@ -244,6 +323,9 @@ public void afterSyncToRemote(Collection<String> files) throws IOException {
}
}

/**
* Return the list of files present in Remote
*/
private String[] getRemoteFiles() {
String[] remoteFiles;
try {
Expand Down
Loading

0 comments on commit 958d4f5

Please sign in to comment.