Skip to content

Commit

Permalink
Introduce interface changes to read/write blob with object metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
Sandeep Kumawat committed Apr 2, 2024
1 parent a103b84 commit 08c9432
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public interface BlobContainer {
*/
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link BlobDownloadResponse} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
return null;
};

/**
* Creates a new {@link InputStream} that can be used to read the given blob starting from
* a specific {@code position} in the blob. The {@code length} is an indication of the
Expand Down Expand Up @@ -128,6 +141,33 @@ default long readBlobPreferredLength() {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {};

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
Expand All @@ -149,6 +189,35 @@ default long readBlobPreferredLength() {
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata
* using an atomic write operation if the implementation supports it.
* <p>
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {};

/**
* Deletes this container and all its contents from the repository.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore;

import java.io.InputStream;
import java.util.Map;

/**
* A class for blob download response
*
* @opensearch.internal
*/
public class BlobDownloadResponse {

/**
* Downloaded blob InputStream
*/
private InputStream inputStream;

/**
* Metadata of the downloaded blob
*/
private Map<String, String> metadata;

public InputStream getInputStream() {
return inputStream;
}

public Map<String, String> getMetadata() {
return metadata;
}

public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
this.inputStream = inputStream;
this.metadata = metadata;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
Expand Down Expand Up @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
}

@Override
public BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}

@Override
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.translog.transfer;

import org.opensearch.common.blobstore.BlobDownloadResponse;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -125,6 +126,15 @@ void uploadBlobs(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

/**
*
* @param path the remote path from where download should be made
* @param fileName the name of the file
* @return {@link BlobDownloadResponse} of the remote file
* @throws IOException the exception while reading the data
*/
BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException;

void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(
Expand Down

0 comments on commit 08c9432

Please sign in to comment.