Skip to content

Commit

Permalink
Add RemoteDirectory interface to copy segment files to/from remote st…
Browse files Browse the repository at this point in the history
…ore (opensearch-project#3102)

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
2 people authored and andrross committed May 26, 2022
1 parent a0030df commit 73c5c9d
Show file tree
Hide file tree
Showing 6 changed files with 696 additions and 0 deletions.
191 changes: 191 additions & 0 deletions server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
* A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in
* the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from
* the remote store. Implementation of remaining methods will be added as remote store is integrated with
* replication, peer recovery etc.
*/
public class RemoteDirectory extends Directory {

private final BlobContainer blobContainer;

public RemoteDirectory(BlobContainer blobContainer) {
this.blobContainer = blobContainer;
}

/**
* Returns names of all files stored in this directory. The output must be in sorted (UTF-16,
* java's {@link String#compareTo}) order.
*/
@Override
public String[] listAll() throws IOException {
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new);
}

/**
* Removes an existing file in the directory.
*
* <p>This method will not throw an exception when the file doesn't exist and simply ignores this case.
* This is a deviation from the {@code Directory} interface where it is expected to throw either
* {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file.
*
* @param name the name of an existing file.
* @throws IOException if the file exists but could not be deleted.
*/
@Override
public void deleteFile(String name) throws IOException {
// ToDo: Add a check for file existence
blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name));
}

/**
* Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote
* store.
*
* <p> In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException}
* if the file already exists in the remote store. As this method does not open a file, it does not throw the
* exception.
*
* @param name the name of the file to copy to remote store.
*/
@Override
public IndexOutput createOutput(String name, IOContext context) {
return new RemoteIndexOutput(name, blobContainer);
}

/**
* Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name));
}

/**
* Closes the directory by deleting all the files in this directory
*/
@Override
public void close() throws IOException {
blobContainer.delete();
}

/**
* Returns the byte length of a file in the directory.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if (metadata.containsKey(name)) {
return metadata.get(name).length();
}
throw new NoSuchFileException(name);
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once soft deleting is supported segment files in the remote store, this method will provide details of
* number of files marked as deleted but not actually deleted from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public Set<String> getPendingDeletions() throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Temporary IndexOutput is not required while working with Remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Segment upload to the remote store will be permanent and does not require a separate sync API.
* This may change in the future if segment upload to remote store happens via cache and we need sync API to write
* the cache contents to the store permanently.
*
* @throws UnsupportedOperationException always
*/
@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory
* metadata to the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void syncMetaData() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* As this method is used by IndexWriter to publish commits, the implementation of this method is required when
* IndexWriter is backed by RemoteDirectory.
*
* @throws UnsupportedOperationException always
*/
@Override
public void rename(String source, String dest) throws IOException {
throw new UnsupportedOperationException();

}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once locking segment files in remote store is supported, implementation of this method is required with
* remote store specific LockFactory.
*
* @throws UnsupportedOperationException always
*/
@Override
public Lock obtainLock(String name) throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.IndexInput;

import java.io.IOException;
import java.io.InputStream;

/**
* Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store.
* Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*/
public class RemoteIndexInput extends IndexInput {

private final InputStream inputStream;
private final long size;

public RemoteIndexInput(String name, InputStream inputStream, long size) {
super(name);
this.inputStream = inputStream;
this.size = size;
}

@Override
public byte readByte() throws IOException {
byte[] buffer = new byte[1];
inputStream.read(buffer);
return buffer[0];
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
inputStream.read(b, offset, len);
}

@Override
public void close() throws IOException {
inputStream.close();
}

@Override
public long length() {
return size;
}

@Override
public void seek(long pos) throws IOException {
inputStream.skip(pos);
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.InputStreamIndexInput;

import java.io.IOException;

/**
* Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store.
* Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*/
public class RemoteIndexOutput extends IndexOutput {

private final BlobContainer blobContainer;

public RemoteIndexOutput(String name, BlobContainer blobContainer) {
super(name, name);
this.blobContainer = blobContainer;
}

@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
assert input instanceof IndexInput : "input should be instance of IndexInput";
blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false);
}

/**
* This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close
* the stream. It is taken care by internal APIs of client of the remote store.
*/
@Override
public void close() throws IOException {
// do nothing
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeByte(byte b) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeBytes(byte[] byteArray, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not directly used for the file transfer to/from the remote store.
* But the checksum is important to verify integrity of the data and that means implementing this method will
* be required for the segment upload as well.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getChecksum() throws IOException {
throw new UnsupportedOperationException();
}

}
Loading

0 comments on commit 73c5c9d

Please sign in to comment.