Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered upload no longer requires length in sync client #22218

Merged
merged 9 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,11 +526,20 @@ buffers is not a common scenario for async like it is in sync (and we already bu
(stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions,
headers, metadata, tags, tier, requestConditions, computeMd5);

Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getLength(),
Flux<ByteBuffer> data = options.getDataFlux();
// no specified length: use azure.core's converter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have markable stream - is SDK smart enough to detect it and don't buffer at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not. Also, though, that is only applicable when maxConcurrency = 1, so we aren't missing out on the biggest optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR somewhat impairs ability to introduce such optimization?
We have customers asking about memory overhead. We should offer a way to do unbuffered upload of markable stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this PR introduces any new hurdles for such a feature that didn't already exist. I did the work for that optimization in .NET and the integration aspect was fairly simple.

if (data == null && options.getOptionalLength() == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
(int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()), false)
: options.getDataFlux();
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a note here that this is fine since buffered upload does not require a replayable flux?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not on this exact line but it's in the method on decision-making that this is buffered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunkSize isn't related to block size right? It's just for the purpose of stream->flux conversion.
I wonder if we should be using block size here.
few things to consider (and please correct me if this is stupid but I have a feeling that his might be reason we've seen higher than expected demand for memory in perf tests):

  • if this results in double buffering, i.e. stream->flux needs memory and then buffered upload needs it as well then in worst case "uploader" might be busy working on uploading last chunk (list of bytebuffers emitted from converter) and converter trying to supply next block - both will hold memory. I believe reactor will make sure at least one next element is getting prepared while recent block is being uploaded. @gapra-msft do you think this makes sense?
  • allocating smaller chunks to feed Flux might be a bit friendlier for allocator and GC. It will have some overhead but on the other hand it's easier to fit multiple smaller arrays into heap rather than finding place for Integer.Max_Value array. The later might trigger GC/heap defragmentation efforts. We shouldn't go crazy small though here but I think int.max is too big. A lot of built in components (like bufferedstreams, http stacks) default to 8KB buffer, so that's the lower boundary, not sure how high we should allow it to be - maybe 64kb? or maybe we need another knob.

Copy link
Member Author

@jaschrep-msft jaschrep-msft Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file an issue for this and assign it to me, or does it really need to be addressed now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be converted to an issue and addressed outside of this PR. Would you mind doing this?

data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize);
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
} else if (data == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
data = Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getOptionalLength(), chunkSize, false);
}

return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(parallelTransferOptions),
uploadInChunksFunction, uploadFullBlobFunction);
} catch (RuntimeException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class BlobParallelUploadOptions {
private final Flux<ByteBuffer> dataFlux;
private final InputStream dataStream;
private final long length;
private final Long length;
private ParallelTransferOptions parallelTransferOptions;
private BlobHttpHeaders headers;
private Map<String, String> metadata;
Expand All @@ -37,7 +37,7 @@ public class BlobParallelUploadOptions {
private Duration timeout;

/**
* Constructs a new {@code BlobParallelUploadOptions}.
* Constructs a new {@link BlobParallelUploadOptions}.
*
* @param dataFlux The data to write to the blob. Unlike other upload methods, this method does not require that
* the {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not
Expand All @@ -47,25 +47,48 @@ public BlobParallelUploadOptions(Flux<ByteBuffer> dataFlux) {
StorageImplUtils.assertNotNull("dataFlux", dataFlux);
this.dataFlux = dataFlux;
this.dataStream = null;
this.length = -1;
this.length = null;
}

/**
* Constructs a new {@code BlobParalleUploadOptions}.
* Constructs a new {@link BlobParallelUploadOptions}.
*
* Use {@link #BlobParallelUploadOptions(InputStream)} instead to supply an InputStream without knowing the exact
* length beforehand.
*
* @param dataStream The data to write to the blob. The data must be markable. This is in order to support retries.
* If the data is not markable, consider opening a {@link com.azure.storage.blob.specialized.BlobOutputStream} and
* writing to the returned stream. Alternatively, consider wrapping your data source in a
* {@link java.io.BufferedInputStream} to add mark support.
* @param dataStream The data to write to the blob.
* @param length The exact length of the data. It is important that this value match precisely the length of the
* data provided in the {@link InputStream}.
* @deprecated length is no longer necessary; use {@link #BlobParallelUploadOptions(InputStream)} instead.
*/
@Deprecated
public BlobParallelUploadOptions(InputStream dataStream, long length) {
this(dataStream, Long.valueOf(length));
}

/**
* Constructs a new {@link BlobParallelUploadOptions}.
*
* @param dataStream The data to write to the blob.
*/
public BlobParallelUploadOptions(InputStream dataStream) {
this(dataStream, null);
}

/**
* Common constructor for building options from InputStream.
*
* @param dataStream The data to write to the blob.
* @param length Optional known length of the data, affects reactive behavior for backwards compatibility.
*/
private BlobParallelUploadOptions(InputStream dataStream, Long length) {
StorageImplUtils.assertNotNull("dataStream", dataStream);
StorageImplUtils.assertInBounds("length", length, 0, Long.MAX_VALUE);
if (length != null) {
StorageImplUtils.assertInBounds("length", length, 0, Long.MAX_VALUE);
}
this.dataStream = dataStream;
this.length = length;
this.dataFlux = null;
this.length = length;
}

/**
Expand All @@ -77,7 +100,7 @@ public BlobParallelUploadOptions(BinaryData data) {
StorageImplUtils.assertNotNull("data", data);
this.dataFlux = Flux.just(data.toByteBuffer());
this.dataStream = null;
this.length = -1;
this.length = null;
}

/**
Expand All @@ -103,11 +126,23 @@ public InputStream getDataStream() {
*
* @return The exact length of the data. It is important that this value match precisely the length of the
* data provided in the {@link InputStream}.
* @deprecated use {@link #getOptionalLength()} to have safe access to a length that will not always exist.
*/
@Deprecated
public long getLength() {
return length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth saving the customer of a NPE here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if length is null should we throw a meaningful error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, it won't throw if they haven't switched to new code path.

}

/**
* Gets the length of the data.
*
* @return The exact length of the data. It is important that this value match precisely the length of the
* data provided in the {@link InputStream}.
*/
public Long getOptionalLength() {
return length;
}

/**
* Gets the {@link ParallelTransferOptions}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,45 @@ class BlobAPITest extends APISpec {
.getValue().getETag() != null
}

def "Upload InputStream no length"() {
when:
bc.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream), null, null)

then:
notThrown(Exception)
bc.downloadContent().toBytes() == data.defaultBytes
}

def "Upload InputStream bad length"() {
when:
bc.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream, length), null, null)

then:
thrown(Exception)

where:
_ | length
_ | 0
_ | -100
_ | data.defaultDataSize - 1
_ | data.defaultDataSize + 1
}

def "Upload successful retry"() {
given:
def clientWithFailure = getBlobClient(
env.primaryAccount.credential,
bc.getBlobUrl(),
new TransientFailureInjectingHttpPipelinePolicy())

when:
clientWithFailure.uploadWithResponse(new BlobParallelUploadOptions(data.defaultInputStream), null, null)

then:
notThrown(Exception)
bc.downloadContent().toBytes() == data.defaultBytes
}

@LiveOnly
// Reading from recordings will not allow for the timing of the test to work correctly.
def "Upload timeout"() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"networkCallRecords" : [ {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/7f5018f307f5018f3aec15131ec2a5fb9dde640e685e?restype=container",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "a3d9e138-90e1-4885-8cd7-544bce3e1fca"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"eTag" : "0x8D92C58EF0BCAB0",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:32 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-id" : "69ef6576-b01e-0019-0641-5e7a16000000",
"x-ms-client-request-id" : "a3d9e138-90e1-4885-8cd7-544bce3e1fca",
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT"
},
"Exception" : null
}, {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/7f5018f307f5018f3aec15131ec2a5fb9dde640e685e/7f5018f317f5018f3aec68648158188ad9aa3434bb9a",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "9fe09bbf-3b98-4f30-b335-6ae82cb45011",
"Content-Type" : "application/octet-stream"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-server-encrypted" : "true",
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT",
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
"eTag" : "0x8D92C58EF38659F",
"x-ms-request-id" : "69ef6580-b01e-0019-0b41-5e7a16000000",
"x-ms-client-request-id" : "9fe09bbf-3b98-4f30-b335-6ae82cb45011"
},
"Exception" : null
} ],
"variables" : [ "7f5018f307f5018f3aec15131ec2a5fb9dde640e685e", "7f5018f317f5018f3aec68648158188ad9aa3434bb9a" ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"networkCallRecords" : [ {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/664b29b20664b29b2b8a39735445a8518b952493b840?restype=container",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "0a78fd58-091c-48dd-8e06-460e6d3b7b2d"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"eTag" : "0x8D92C58EF7CCB72",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-id" : "69ef6587-b01e-0019-1241-5e7a16000000",
"x-ms-client-request-id" : "0a78fd58-091c-48dd-8e06-460e6d3b7b2d",
"Date" : "Thu, 10 Jun 2021 21:44:32 GMT"
},
"Exception" : null
}, {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/664b29b20664b29b2b8a39735445a8518b952493b840/664b29b21664b29b2b8a255541c44a05dfa854d5d864",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "8f4270ac-e7b8-4aba-8fc1-067461d7a2f2",
"Content-Type" : "application/octet-stream"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:33 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-server-encrypted" : "true",
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
"eTag" : "0x8D92C58EF8BA9BB",
"x-ms-request-id" : "69ef658a-b01e-0019-1441-5e7a16000000",
"x-ms-client-request-id" : "8f4270ac-e7b8-4aba-8fc1-067461d7a2f2"
},
"Exception" : null
} ],
"variables" : [ "664b29b20664b29b2b8a39735445a8518b952493b840", "664b29b21664b29b2b8a255541c44a05dfa854d5d864" ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"networkCallRecords" : [ {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/4d667a7104d667a71bea6591957c250d6618b480daf9?restype=container",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "c79f5d3b-8274-4c59-ada0-c8d13da599cf"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"eTag" : "0x8D92C58EFB6BEAF",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-id" : "69ef6590-b01e-0019-1a41-5e7a16000000",
"x-ms-client-request-id" : "c79f5d3b-8274-4c59-ada0-c8d13da599cf",
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT"
},
"Exception" : null
}, {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/4d667a7104d667a71bea6591957c250d6618b480daf9/4d667a7114d667a71bea705084c8cea16df3a446680d",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "4054dcf0-fb43-40f1-8f39-d47e1a4e5500",
"Content-Type" : "application/octet-stream"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-server-encrypted" : "true",
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
"eTag" : "0x8D92C58EFC500F4",
"x-ms-request-id" : "69ef6593-b01e-0019-1c41-5e7a16000000",
"x-ms-client-request-id" : "4054dcf0-fb43-40f1-8f39-d47e1a4e5500"
},
"Exception" : null
} ],
"variables" : [ "4d667a7104d667a71bea6591957c250d6618b480daf9", "4d667a7114d667a71bea705084c8cea16df3a446680d" ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"networkCallRecords" : [ {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/547d4b300547d4b30f11300636363b37d2e8646149bf?restype=container",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "d7c66faa-0162-4603-98c4-6fa0af046d3d"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"eTag" : "0x8D92C58EFEF5292",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-id" : "69ef659a-b01e-0019-2241-5e7a16000000",
"x-ms-client-request-id" : "d7c66faa-0162-4603-98c4-6fa0af046d3d",
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT"
},
"Exception" : null
}, {
"Method" : "PUT",
"Uri" : "https://REDACTED.blob.core.windows.net/547d4b300547d4b30f11300636363b37d2e8646149bf/547d4b301547d4b30f118395439a3b957136742218ef",
"Headers" : {
"x-ms-version" : "2020-08-04",
"User-Agent" : "azsdk-java-azure-storage-blob/12.13.0-beta.1 (11.0.9; Windows 10; 10.0)",
"x-ms-client-request-id" : "96a79c9b-ca06-4499-9825-9a7d0ba61b79",
"Content-Type" : "application/octet-stream"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2020-08-04",
"Server" : "Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0",
"x-ms-content-crc64" : "6RYQPwaVsyQ=",
"Last-Modified" : "Thu, 10 Jun 2021 21:44:34 GMT",
"retry-after" : "0",
"StatusCode" : "201",
"x-ms-request-server-encrypted" : "true",
"Date" : "Thu, 10 Jun 2021 21:44:33 GMT",
"Content-MD5" : "wh+Wm18D0z1D4E+PE252gg==",
"eTag" : "0x8D92C58EFFD6DF1",
"x-ms-request-id" : "69ef659c-b01e-0019-2341-5e7a16000000",
"x-ms-client-request-id" : "96a79c9b-ca06-4499-9825-9a7d0ba61b79"
},
"Exception" : null
} ],
"variables" : [ "547d4b300547d4b30f11300636363b37d2e8646149bf", "547d4b301547d4b30f118395439a3b957136742218ef" ]
}
Loading