Skip to content

Commit

Permalink
chore: add StorageInternal#{delete(BlobId),internalDirectUpload} (#2221)
Browse files Browse the repository at this point in the history
* chore: add StorageInternal#{delete(BlobId),internalDirectUpload} (#2221)

_Pre-Work_

Add two new method to StorageInternal for access by new BlobWriteSession implementations
1. `#delete(BlobId)`
2. `#internalDirectUpload(BlobInfo, Opts<ObjectTargetOpt>, ByteBuffer)`

* chore: add ParallelCompositeUpload related config (#2229)

_Pre-work_

Add skeleton of config parameters available for Parallel Composite Uploads
  • Loading branch information
BenWhitehead authored Sep 29, 2023
1 parent c4e56fb commit 83e2d34
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,35 +244,9 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
@Override
public Blob create(
BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) {
requireNonNull(blobInfo, "blobInfo must be non null");
requireNonNull(content, "content must be non null");
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.enabled();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
.build();

try (UnbufferedWritableByteChannel c = session.open()) {
c.write(ByteBuffer.wrap(content, offset, length));
}
return session.getResult();
},
this::getBlob);
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
return internalDirectUpload(blobInfo, opts, ByteBuffer.wrap(content, offset, length))
.asBlob(this);
}

@Override
Expand Down Expand Up @@ -749,6 +723,42 @@ public GrpcBlobWriteChannel internalWriter(BlobInfo info, Opts<ObjectTargetOpt>
hasher);
}

@Override
public BlobInfo internalDirectUpload(
BlobInfo blobInfo, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
requireNonNull(blobInfo, "blobInfo must be non null");
requireNonNull(buf, "content must be non null");
Opts<ObjectTargetOpt> optsWithDefaults = opts.prepend(defaultOpts);
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
RewindableContent content = RewindableContent.of(buf);
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
content.rewindTo(0);
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
.build();

try (UnbufferedWritableByteChannel c = session.open()) {
content.writeTo(c);
}
return session.getResult();
},
this::getBlob);
}

@Override
public WriteChannel writer(URL signedURL) {
return throwHttpJsonOnly(fmtMethodName("writer", URL.class));
Expand Down
Loading

0 comments on commit 83e2d34

Please sign in to comment.