Skip to content

Commit

Permalink
Add experimental annotations for newly created classes and review com…
Browse files Browse the repository at this point in the history
…ment fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed May 9, 2024
1 parent 93d1f2a commit 3280a6d
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@
package org.opensearch.index.store;

import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lucene.store.FilterIndexOutput;

import java.io.IOException;

/**
* FilterIndexOutput which takes in an additional FunctionalInterface as a parameter to perform required operations once the IndexOutput is closed
*
* @opensearch.internal
* @opensearch.experimental
*/
@ExperimentalApi
public class CloseableFilterIndexOutput extends FilterIndexOutput {

/**
* Functional Interface which takes the name of the file as input on which the required operations are to be performed
*/
@FunctionalInterface
public interface OnCloseListener {
void onClose(String name);
void onClose(String name) throws IOException;
}

OnCloseListener onCloseListener;
String fileName;
private final OnCloseListener onCloseListener;
private final String fileName;

public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListener onCloseListener) {
super("CloseableFilterIndexOutput for file " + fileName, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.NonBlockCachedIndexInput;
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInput;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.utils.BlockIOContext;
import org.opensearch.index.store.remote.utils.FileType;
Expand All @@ -45,7 +45,7 @@
* All such abstractions will be handled by the Composite directory itself
* Implements all required methods by Directory abstraction
*
* @opensearch.internal
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeDirectory extends FilterDirectory {
Expand Down Expand Up @@ -143,9 +143,12 @@ public long fileLength(String name) throws IOException {
long fileLength;
Path key = localDirectory.getDirectory().resolve(name);
if (isTempFile(name) || fileCache.get(key) != null) {
fileLength = localDirectory.fileLength(name);
fileCache.decRef(key);
logger.trace("fileLength from Local {}", fileLength);
try {
fileLength = localDirectory.fileLength(name);
logger.trace("fileLength from Local {}", fileLength);
} finally {
fileCache.decRef(key);
}
} else {
fileLength = remoteDirectory.fileLength(name);
logger.trace("fileLength from Remote {}", fileLength);
Expand All @@ -170,13 +173,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
/*
* The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file
*/
return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, (fileName) -> {
try {
cacheFile(fileName);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile);
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -393,7 +390,7 @@ private String[] getRemoteFiles() throws IOException {

private void cacheFile(String name) throws IOException {
Path filePath = localDirectory.getDirectory().resolve(name);
fileCache.put(filePath, new NonBlockCachedIndexInput(localDirectory.openInput(name, IOContext.READ)));
fileCache.put(filePath, new FullFileCachedIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.READ)));
// Decrementing ref here as above put call increments the ref of the key
fileCache.decRef(filePath);
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ public FileCachedIndexInput clone() {

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
// never reach here!
throw new UnsupportedOperationException("FileCachedIndexInput couldn't be sliced.");
IndexInput slicedIndexInput = luceneIndexInput.slice(sliceDescription, offset, length);
cache.incRef(filePath);
return new FileCachedIndexInput(cache, filePath, slicedIndexInput, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,45 @@

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implementation of the CachedIndexInput for NON_BLOCK files which takes in an IndexInput as parameter
*
* @opensearch.experimental
*/
public class NonBlockCachedIndexInput implements CachedIndexInput {
@ExperimentalApi
public class FullFileCachedIndexInput implements CachedIndexInput {

private final IndexInput indexInput;
private final FileCache fileCache;
private final Path path;
private final FileCachedIndexInput fileCachedIndexInput;
private final AtomicBoolean isClosed;

/**
* Constructor - takes IndexInput as parameter
*/
public NonBlockCachedIndexInput(IndexInput indexInput) {
public FullFileCachedIndexInput(FileCache fileCache, Path path, IndexInput indexInput) {
this.fileCache = fileCache;
this.path = path;
this.indexInput = indexInput;
fileCachedIndexInput = new FileCachedIndexInput(fileCache, path, indexInput);
isClosed = new AtomicBoolean(false);
}

/**
* Returns the wrapped indexInput
*/
@Override
public IndexInput getIndexInput() throws IOException {
return indexInput;
public IndexInput getIndexInput() {
if (isClosed.get()) throw new AlreadyClosedException("Index input is already closed");
return fileCachedIndexInput;
}

/**
Expand All @@ -60,7 +72,6 @@ public boolean isClosed() {
public void close() throws Exception {
if (!isClosed.getAndSet(true)) {
indexInput.close();
isClosed.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,29 @@
package org.opensearch.index.store.remote.utils;

import org.apache.lucene.store.IOContext;
import org.opensearch.common.annotation.ExperimentalApi;

/**
* BlockIOContext is an extension of IOContext which can be used to pass block related information to the openInput() method of any directory
*
* @opensearch.experimental
*/
@ExperimentalApi
public class BlockIOContext extends IOContext {

private final boolean isBlockRequest;
private long blockStart;
private long blockSize;

/**
* Default constructor
*/
BlockIOContext(IOContext ctx) {
super(ctx.context);
this.isBlockRequest = false;
this.blockStart = -1;
this.blockSize = -1;
}

/**
* Constructor to initialise BlockIOContext with block related information
*/
public BlockIOContext(IOContext ctx, long blockStart, long blockSize) {
super(ctx.context);
this.isBlockRequest = true;
verifyBlockStartAndSize(blockStart, blockSize);
this.blockStart = blockStart;
this.blockSize = blockSize;
}

/**
* Function to check if the Context contains a block request or not
*/
public boolean isBlockRequest() {
return isBlockRequest;
}

/**
* Getter for blockStart
*/
Expand All @@ -59,4 +45,9 @@ public long getBlockStart() {
public long getBlockSize() {
return blockSize;
}

private void verifyBlockStartAndSize(long blockStart, long blockSize) {
if (blockStart < 0) throw new IllegalArgumentException("blockStart must be greater than or equal to 0");
if (blockSize <= 0) throw new IllegalArgumentException(("blockSize must be greater than 0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@

package org.opensearch.index.store.remote.utils;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* Enum to represent whether a file is block or not
*
* @opensearch.experimental
*/
@ExperimentalApi
public enum FileType {
/**
* Block file
*/
BLOCK,
BLOCK(".*_block_.*"),
/**
* Non block file
* Full file - Non-Block
*/
NON_BLOCK;
FULL(".*");

private final String pattern;

FileType(String pattern) {
this.pattern = pattern;
}

/**
* Returns if the fileType is a block file or not
Expand All @@ -32,7 +43,7 @@ public static boolean isBlockFile(FileType fileType) {
* Returns if the fileName is block file or not
*/
public static boolean isBlockFile(String fileName) {
if (fileName.contains("_block_")) return true;
if (fileName.matches(FileType.BLOCK.pattern)) return true;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.NonBlockCachedIndexInput;
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInput;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testRename() throws IOException {
compositeDirectory.rename("old_file_name", "new_file_name");
verify(localDirectory).rename("old_file_name", "new_file_name");
verify(fileCache).remove(resolvedPathOldFile);
verify(fileCache).put(eq(resolvedPathNewFile), any(NonBlockCachedIndexInput.class));
verify(fileCache).put(eq(resolvedPathNewFile), any(FullFileCachedIndexInput.class));
}

public void testOpenInput() throws IOException {
Expand Down

0 comments on commit 3280a6d

Please sign in to comment.