Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Segment Store] Add RemoteDirectory interface to copy segment files to/from remote store #3102

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
* A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in
* the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from
* the remote store. Implementation of remaining methods will be added as remote store is integrated with
* replication, peer recovery etc.
*/
public class RemoteDirectory extends Directory {

private final BlobContainer blobContainer;

public RemoteDirectory(BlobContainer blobContainer) {
this.blobContainer = blobContainer;
}

/**
* Returns names of all files stored in this directory. The output must be in sorted (UTF-16,
* java's {@link String#compareTo}) order.
*/
@Override
public String[] listAll() throws IOException {
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new);
}

/**
* Removes an existing file in the directory.
*
* <p>This method will not throw an exception when the file doesn't exist and simply ignores this case.
* This is a deviation from the {@code Directory} interface where it is expected to throw either
* {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file.
*
* @param name the name of an existing file.
* @throws IOException if the file exists but could not be deleted.
*/
@Override
public void deleteFile(String name) throws IOException {
// ToDo: Add a check for file existence
blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name));
}

/**
* Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote
* store.
*
* <p> In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException}
* if the file already exists in the remote store. As this method does not open a file, it does not throw the
* exception.
*
* @param name the name of the file to copy to remote store.
*/
@Override
public IndexOutput createOutput(String name, IOContext context) {
return new RemoteIndexOutput(name, blobContainer);
}

/**
* Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name));
}

/**
* Closes the directory by deleting all the files in this directory
*/
@Override
public void close() throws IOException {
blobContainer.delete();
}

/**
* Returns the byte length of a file in the directory.
*
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist
*/
@Override
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if (metadata.containsKey(name)) {
return metadata.get(name).length();
}
throw new NoSuchFileException(name);
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once soft deleting is supported segment files in the remote store, this method will provide details of
* number of files marked as deleted but not actually deleted from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public Set<String> getPendingDeletions() throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Temporary IndexOutput is not required while working with Remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Segment upload to the remote store will be permanent and does not require a separate sync API.
* This may change in the future if segment upload to remote store happens via cache and we need sync API to write
* the cache contents to the store permanently.
*
* @throws UnsupportedOperationException always
*/
@Override
public void sync(Collection<String> names) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory
* metadata to the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void syncMetaData() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* As this method is used by IndexWriter to publish commits, the implementation of this method is required when
* IndexWriter is backed by RemoteDirectory.
*
* @throws UnsupportedOperationException always
*/
@Override
public void rename(String source, String dest) throws IOException {
throw new UnsupportedOperationException();

}

/**
* Guaranteed to throw an exception and leave the directory unmodified.
* Once locking segment files in remote store is supported, implementation of this method is required with
* remote store specific LockFactory.
*
* @throws UnsupportedOperationException always
*/
@Override
public Lock obtainLock(String name) throws IOException {
throw new UnsupportedOperationException();
}
}
Comment on lines +187 to +191
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to acquire a lock before segment uploads or can we support concurrent uploads as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as segments in the remote store are uploaded and downloaded for backup and restore part, I don't think locking is required. Once we start writing to remote store via IndexWriter, locking would be required.

Open to suggestions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we extend BaseDirectory, it will provide locking functionality with the provided LockFactory.

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.IndexInput;

import java.io.IOException;
import java.io.InputStream;

/**
* Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store.
* Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*/
public class RemoteIndexInput extends IndexInput {

private final InputStream inputStream;
private final long size;

public RemoteIndexInput(String name, InputStream inputStream, long size) {
super(name);
this.inputStream = inputStream;
this.size = size;
}

@Override
Comment on lines +24 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have a VerifyingIndexInput we might want to consider that handles the checksums

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, either VerifyingIndexInput or ChecksumIndexInput can be used. I have added it as a ToDo item in the javadoc of the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue is created for Checksum. Tagging this comment on the issue.

public byte readByte() throws IOException {
byte[] buffer = new byte[1];
inputStream.read(buffer);
return buffer[0];
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
inputStream.read(b, offset, len);
}

@Override
public void close() throws IOException {
inputStream.close();
}

@Override
public long length() {
return size;
}

@Override
public void seek(long pos) throws IOException {
inputStream.skip(pos);
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lucene.store.InputStreamIndexInput;

import java.io.IOException;

/**
* Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store.
* Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are
* implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication,
* peer recovery etc.
* ToDo: Extend ChecksumIndexInput
* @see RemoteDirectory
*/
public class RemoteIndexOutput extends IndexOutput {

private final BlobContainer blobContainer;

public RemoteIndexOutput(String name, BlobContainer blobContainer) {
super(name, name);
this.blobContainer = blobContainer;
}

@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
assert input instanceof IndexInput : "input should be instance of IndexInput";
blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false);
}

Comment on lines +36 to +41
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say if we go with concurrent writes to local and remote store would we still need a copyBytes or writeByte

Copy link
Collaborator

@Bukhtawar Bukhtawar May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also use ByteBuffer for leveraging I/O optimizations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say if we go with concurrent writes to local and remote store would we still need a copyBytes or writeByte

For concurrent writes, we will need writeByte/s

Do we also use ByteBuffer for leveraging I/O optimizations?

Right. Current implementation may not be optimized. It also depends on how a given repository implementation is handling writeBlob.
We will be having a continuous performance test setup which will provide insight into the performance issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case, let's say it's not concurrent, but we want writes to only happen on remote which can be directly searchable we might want to just do writeBytes to remote directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the idea is to open up the methods as per the requirement. As you said, writeByte/writeBytes methods will be required when IndexWriter is using RemoteDirectory to create the segments (concurrent writes or writing only to remote store).

/**
* This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close
* the stream. It is taken care by internal APIs of client of the remote store.
*/
@Override
public void close() throws IOException {
// do nothing
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeByte(byte b) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public void writeBytes(byte[] byteArray, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
throw new UnsupportedOperationException();
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified.
* This method is not implemented as it is not directly used for the file transfer to/from the remote store.
* But the checksum is important to verify integrity of the data and that means implementing this method will
* be required for the segment upload as well.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getChecksum() throws IOException {
throw new UnsupportedOperationException();
}

}
Loading