Skip to content

Commit

Permalink
Use byte buffer for input stream
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jun 8, 2024
1 parent b28618d commit 0cfe278
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
Expand Down Expand Up @@ -54,15 +55,20 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
transferService.uploadBlob(
inputStream,
getBlobPathForUpload(entity),
entity.getBlobFileName(),
WritePriority.URGENT,
listener
);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
Expand All @@ -24,8 +27,10 @@
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.Streams;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -36,6 +41,7 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -113,24 +119,30 @@ public void uploadBlobs(
}

@Override
public void uploadBlobAsync(
public void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String blobName,
String fileName,
WritePriority writePriority,
ActionListener<Void> listener
) throws IOException {
assert remotePath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remotePath;
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
blobContainer.writeBlob(blobName, inputStream, inputStream.available(), false);
blobContainer.writeBlob(fileName, inputStream, inputStream.available(), false);
listener.onResponse(null);
return;
}
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + blobName + "\")";
byte[] bytes = inputStream.readAllBytes();
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
try (
IndexInput input = inputStream.available() > 0
? new ByteBuffersIndexInput(
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(Streams.readFully(inputStream)))),
resourceDescription
)
: new ByteArrayIndexInput(resourceDescription, BytesRef.EMPTY_BYTES)
) {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
Expand All @@ -143,16 +155,16 @@ public void uploadBlobAsync(
);
}

asyncBlobUpload(
blobName,
blobName,
bytes.length,
uploadBlobAsyncInternal(
fileName,
fileName,
inputStream.available(),
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
null,
listener
listener,
null
);
}
}
Expand Down Expand Up @@ -205,16 +217,16 @@ private void uploadBlob(
});

Objects.requireNonNull(fileSnapshot.getChecksum());
asyncBlobUpload(
uploadBlobAsyncInternal(
fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
blobPath,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
fileSnapshot.getChecksum(),
metadata,
completionListener
completionListener,
metadata
);

} catch (Exception e) {
Expand All @@ -230,16 +242,16 @@ private void uploadBlob(

}

private void asyncBlobUpload(
private void uploadBlobAsyncInternal(
String fileName,
String remoteFileName,
long contentLength,
BlobPath blobPath,
WritePriority writePriority,
RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier,
long expectedChecksum,
Map<String, String> metadata,
ActionListener<Void> completionListener
ActionListener<Void> completionListener,
Map<String, String> metadata
) throws IOException {
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
assert blobContainer instanceof AsyncMultiStreamBlobContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void uploadBlobs(
* @param listener the callback to be invoked once uploads complete successfully/fail
* @throws IOException the exception thrown while uploading
*/
void uploadBlobAsync(
void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String blobName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void onFailure(Exception e) {
resp -> listener.onResponse(testObject),
ex -> listener.onFailure(ex)
);
transferService.uploadBlobAsync(inputStream, repository.basePath(), "test-object", WritePriority.URGENT, completionListener);
transferService.uploadBlob(inputStream, repository.basePath(), "test-object", WritePriority.URGENT, completionListener);
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(succeeded.get());
}
Expand Down

0 comments on commit 0cfe278

Please sign in to comment.