From 1c9985892ae59b870d69a5bdebdfadf25dfcd474 Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Mon, 27 Apr 2015 17:55:06 -0700 Subject: [PATCH] Native multipart upload support Fixes #2. Do not commit; requires: https://github.com/jclouds/jclouds/pull/737 --- pom.xml | 2 +- .../java/org/gaul/s3proxy/S3ProxyHandler.java | 277 ++++++++---------- .../java/org/gaul/s3proxy/S3ProxyTest.java | 54 +++- 3 files changed, 160 insertions(+), 173 deletions(-) diff --git a/pom.xml b/pom.xml index d7923e65..6d6233a8 100644 --- a/pom.xml +++ b/pom.xml @@ -235,7 +235,7 @@ UTF-8 - 1.9.1-SNAPSHOT + 2.0.0-SNAPSHOT diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java index 22a896eb..6dc5dffd 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java @@ -34,12 +34,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -58,6 +58,7 @@ import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -65,7 +66,6 @@ import com.google.common.collect.TreeMultimap; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; -import com.google.common.hash.HashingInputStream; import com.google.common.io.BaseEncoding; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; @@ -82,6 +82,8 @@ import org.jclouds.blobstore.domain.BlobBuilder; import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.ContainerAccess; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.blobstore.options.CopyOptions; @@ -94,6 +96,8 @@ import org.jclouds.http.HttpResponseException; import org.jclouds.io.ContentMetadata; import org.jclouds.io.ContentMetadataBuilder; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; import org.jclouds.rest.AuthorizationException; import org.jclouds.util.Throwables2; import org.slf4j.Logger; @@ -117,10 +121,6 @@ final class S3ProxyHandler extends AbstractHandler { private static final String FAKE_INITIATOR_DISPLAY_NAME = "umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx"; private static final String FAKE_REQUEST_ID = "4442587FB7D0A2F9"; - private static final String FAKE_UPLOAD_ID = - "EXAMPLEJZ6e0YupT2h66iePQCc9IEbYbDUy4RTpMeoSMLPRp8Z5o1u8feSRo" + - "npvnWsKKG35tI2LB9VDPiCgTy.Gq2VxQLYjrue4Nq.NBdqI-"; - private static final long MINIMUM_MULTIPART_PART_SIZE = 5 * 1024 * 1024; private static final Pattern VALID_BUCKET_PATTERN = Pattern.compile("[a-zA-Z0-9._-]+"); private static final Set SIGNED_SUBRESOURCES = ImmutableSet.of( @@ -1202,14 +1202,14 @@ private void handlePutBlob(HttpServletRequest request, private void handleInitiateMultipartUpload(HttpServletRequest request, HttpServletResponse response, BlobStore blobStore, String containerName, String blobName) throws IOException { - String uploadId = FAKE_UPLOAD_ID + UUID.randomUUID().toString(); ByteSource payload = ByteSource.empty(); BlobBuilder.PayloadBlobBuilder builder = blobStore - .blobBuilder(uploadId) + .blobBuilder(blobName) .payload(payload); addContentMetdataFromHttpRequest(builder, request); builder.contentLength(payload.size()); - blobStore.putBlob(containerName, builder.build()); + MultipartUpload mpu = blobStore.initiateMultipartUpload(containerName, + builder.build().getMetadata()); try (Writer writer = response.getWriter()) { XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( @@ -1220,7 +1220,7 @@ private void handleInitiateMultipartUpload(HttpServletRequest request, writeSimpleElement(xml, "Bucket", containerName); writeSimpleElement(xml, "Key", blobName); - writeSimpleElement(xml, "UploadId", uploadId); + writeSimpleElement(xml, "UploadId", mpu.id()); xml.writeEndElement(); xml.flush(); @@ -1233,57 +1233,35 @@ private void handleCompleteMultipartUpload(HttpServletRequest request, HttpServletResponse response, BlobStore blobStore, String containerName, String blobName, String uploadId) throws IOException, S3Exception { - Collection partNames = new ArrayList<>(); - long totalContentLength = 0; + ImmutableList.Builder parts = ImmutableList.builder(); try (InputStream is = request.getInputStream()) { - for (Iterator it = parseSimpleXmlElements(is, - "PartNumber").iterator(); it.hasNext();) { - String partName = uploadId + "." + it.next(); - partNames.add(partName); - BlobMetadata metadata = blobStore.blobMetadata(containerName, - partName); - long contentLength = - metadata.getContentMetadata().getContentLength(); - if (contentLength < MINIMUM_MULTIPART_PART_SIZE && + for (Map.Entry entry : + parseCompleteMultipartUpload(is).entrySet()) { + // TODO: how to replicate this check? +/* + if (contentLength < blobStore.getMinimumMultipartPartSize() && it.hasNext()) { throw new S3Exception(S3ErrorCode.ENTITY_TOO_SMALL); } - totalContentLength += contentLength; - } - - if (partNames.isEmpty()) { - // Amazon requires at least one part - throw new S3Exception(S3ErrorCode.MALFORMED_X_M_L); +*/ + // TODO: how we will discover this for Swift? list parts? + long partSize = -1; + parts.add(MultipartPart.create(entry.getKey(), + partSize, entry.getValue())); } } try (Writer writer = response.getWriter()) { - BlobMetadata blobMetadata = blobStore.blobMetadata( - containerName, uploadId); - ContentMetadata contentMetadata = - blobMetadata.getContentMetadata(); - BlobBuilder.PayloadBlobBuilder builder = blobStore - .blobBuilder(blobName) - .userMetadata(blobMetadata.getUserMetadata()) - .payload(new MultiBlobByteSource(blobStore, containerName, - partNames)) - .contentDisposition( - contentMetadata.getContentDisposition()) - .contentEncoding(contentMetadata.getContentEncoding()) - .contentLanguage(contentMetadata.getContentLanguage()) - .contentLength(totalContentLength) - .expires(contentMetadata.getExpires()); - String contentType = contentMetadata.getContentType(); - if (contentType != null) { - builder.contentType(contentType); - } - - // TODO: will the client time out here? - String eTag = blobStore.putBlob(containerName, builder.build(), - new PutOptions().multipart(true)); - - blobStore.removeBlobs(containerName, partNames); - blobStore.removeBlob(containerName, uploadId); + // TODO: how to reconstruct original mpu? Azure and Swift need the + // content and user metadata from initiateMultipartUpload. We + // could store these in a fake blob. + BlobMetadata blobMetadata = blobStore.blobBuilder(blobName) + .payload(new byte[0]) + .build() + .getMetadata(); + MultipartUpload mpu = MultipartUpload.create(containerName, + blobName, uploadId, blobMetadata); + String eTag = blobStore.completeMultipartUpload(mpu, parts.build()); XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( writer); @@ -1318,24 +1296,24 @@ private void handleAbortMultipartUpload(HttpServletRequest request, HttpServletResponse response, BlobStore blobStore, String containerName, String blobName, String uploadId) throws IOException, S3Exception { + // TODO: how to handle this? +/* if (!blobStore.blobExists(containerName, uploadId)) { throw new S3Exception(S3ErrorCode.NO_SUCH_UPLOAD); } - PageSet pageSet = blobStore.list( - containerName, - new ListContainerOptions().afterMarker(uploadId)); - for (StorageMetadata sm : pageSet) { - String partName = sm.getName(); - if (!partName.startsWith(uploadId + ".")) { - break; - } - // TODO: call removeBlobs - blobStore.removeBlob(containerName, partName); - } - blobStore.removeBlob(containerName, uploadId); +*/ + // TODO: how to reconstruct original mpu? + BlobMetadata blobMetadata = blobStore.blobBuilder(blobName) + .payload(new byte[0]) + .build() + .getMetadata(); + MultipartUpload mpu = MultipartUpload.create(containerName, + blobName, uploadId, blobMetadata); + blobStore.abortMultipartUpload(mpu); response.sendError(HttpServletResponse.SC_NO_CONTENT); } + // TODO: private void handleListParts(HttpServletRequest request, HttpServletResponse response, BlobStore blobStore, String containerName, String blobName, String uploadId) @@ -1372,30 +1350,29 @@ private void handleListParts(HttpServletRequest request, writeSimpleElement(xml, "IsTruncated", "true"); */ - PageSet pageSet = blobStore.list( - containerName, - new ListContainerOptions().afterMarker(uploadId)); - for (StorageMetadata sm : pageSet) { - String partName = sm.getName(); - if (!partName.startsWith(uploadId + ".")) { - break; - } + // TODO: how to reconstruct original mpu? + BlobMetadata blobMetadata = blobStore.blobBuilder(blobName) + .payload(new byte[0]) + .build() + .getMetadata(); + MultipartUpload mpu = MultipartUpload.create(containerName, + blobName, uploadId, blobMetadata); - BlobMetadata metadata = blobStore.blobMetadata(containerName, - partName); + List parts = blobStore.listMultipartUpload(mpu); + for (MultipartPart part : parts) { xml.writeStartElement("Part"); - writeSimpleElement(xml, "PartNumber", - partName.substring((uploadId + ".").length())); + writeSimpleElement(xml, "PartNumber", String.valueOf( + part.partNumber())); - Date lastModified = sm.getLastModified(); + Date lastModified = null; // TODO: if (lastModified != null) { writeSimpleElement(xml, "LastModified", blobStore.getContext().utils().date() .iso8601DateFormat(lastModified)); } - String eTag = sm.getETag(); + String eTag = part.partETag(); if (eTag != null) { String blobStoreType = getBlobStoreType(blobStore); if (blobStoreType.equals("google-cloud-storage")) { @@ -1406,7 +1383,7 @@ private void handleListParts(HttpServletRequest request, } writeSimpleElement(xml, "Size", String.valueOf( - metadata.getContentMetadata().getContentLength())); + part.partSize())); xml.writeEndElement(); } @@ -1461,27 +1438,30 @@ private void handleUploadPart(HttpServletRequest request, throw new S3Exception(S3ErrorCode.INVALID_ARGUMENT); } - String partNumber = request.getParameter("partNumber"); + String partNumberString = request.getParameter("partNumber"); + int partNumber = Integer.parseInt(partNumberString); // TODO: sanity checking - try (HashingInputStream his = new HashingInputStream(Hashing.md5(), - request.getInputStream())) { - BlobBuilder.PayloadBlobBuilder builder = blobStore - .blobBuilder(uploadId + "." + partNumber) - .payload(his) - .contentLength(request.getContentLength()); - addContentMetdataFromHttpRequest(builder, request); + // TODO: how to reconstruct original mpu? + BlobMetadata blobMetadata = blobStore.blobBuilder(blobName) + .payload(new byte[0]) + .contentLength(contentLength) + .build() + .getMetadata(); + MultipartUpload mpu = MultipartUpload.create(containerName, + blobName, uploadId, blobMetadata); + + MultipartPart part; + try (InputStream is = request.getInputStream()) { + Payload payload = Payloads.newInputStreamPayload(is); + payload.getContentMetadata().setContentLength(contentLength); if (contentMD5 != null) { - builder = builder.contentMD5(contentMD5); + payload.getContentMetadata().setContentMD5(contentMD5); } - blobStore.putBlob(containerName, builder.build()); + part = blobStore.uploadMultipartPart(mpu, partNumber, payload); - // recalculate ETag since some object stores like Azure return - // non-hash - byte[] hashCode = his.hash().asBytes(); - response.addHeader(HttpHeaders.ETAG, "\"" + - BaseEncoding.base16().lowerCase().encode(hashCode) + "\""); + response.addHeader(HttpHeaders.ETAG, part.partETag()); } } @@ -1694,6 +1674,45 @@ private Collection parseSimpleXmlElements(InputStream is, return elements; } + private SortedMap parseCompleteMultipartUpload( + InputStream is) throws IOException { + SortedMap parts = new TreeMap<>(); + try { + XMLStreamReader reader = xmlInputFactory.createXMLStreamReader(is); + int partNumber = -1; + String eTag = null; + StringBuilder characters = new StringBuilder(); + + while (reader.hasNext()) { + switch (reader.getEventType()) { + case XMLStreamConstants.CHARACTERS: + characters.append(reader.getTextCharacters(), + reader.getTextStart(), reader.getTextLength()); + break; + case XMLStreamConstants.END_ELEMENT: + String tag = reader.getLocalName(); + if (tag.equalsIgnoreCase("PartNumber")) { + partNumber = Integer.parseInt(characters.toString()); + } else if (tag.equalsIgnoreCase("ETag")) { + eTag = characters.toString(); + } else if (tag.equalsIgnoreCase("Part")) { + parts.put(partNumber, eTag); + partNumber = -1; + eTag = null; + } + characters.setLength(0); + break; + default: + break; + } + reader.next(); + } + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + return parts; + } + private static void addContentMetdataFromHttpRequest( BlobBuilder.PayloadBlobBuilder builder, HttpServletRequest request) { @@ -1740,70 +1759,4 @@ private static void writeSimpleElement(XMLStreamWriter xml, xml.writeCharacters(characters); xml.writeEndElement(); } - - static final class MultiBlobByteSource extends ByteSource { - private final BlobStore blobStore; - private final String containerName; - private final Collection blobNames; - - MultiBlobByteSource(BlobStore blobStore, String containerName, - Collection blobNames) { - this.blobStore = requireNonNull(blobStore); - this.containerName = requireNonNull(containerName); - this.blobNames = requireNonNull(blobNames); - } - - @Override - public InputStream openStream() throws IOException { - return new MultiBlobInputStream(blobStore, containerName, - blobNames); - } - } - - static final class MultiBlobInputStream extends InputStream { - private final BlobStore blobStore; - private final String containerName; - private final Iterator blobNames; - private InputStream is; - - MultiBlobInputStream(BlobStore blobStore, String containerName, - Collection blobNames) throws IOException { - this.blobStore = requireNonNull(blobStore); - this.containerName = requireNonNull(containerName); - this.blobNames = blobNames.iterator(); - resetInputStream(); - } - - @Override - public int read() throws IOException { - int ch = is.read(); - if (ch != -1) { - return ch; - } else if (blobNames.hasNext()) { - resetInputStream(); - return is.read(); - } else { - return -1; - } - } - - @Override - public int read(byte[] array, int offset, int length) - throws IOException { - int ch = is.read(array, offset, length); - if (ch != -1) { - return ch; - } else if (blobNames.hasNext()) { - resetInputStream(); - return is.read(array, offset, length); - } else { - return -1; - } - } - - private void resetInputStream() throws IOException { - Blob blob = blobStore.getBlob(containerName, blobNames.next()); - is = blob.getPayload().openStream(); - } - } } diff --git a/src/test/java/org/gaul/s3proxy/S3ProxyTest.java b/src/test/java/org/gaul/s3proxy/S3ProxyTest.java index 9300f881..68887c80 100644 --- a/src/test/java/org/gaul/s3proxy/S3ProxyTest.java +++ b/src/test/java/org/gaul/s3proxy/S3ProxyTest.java @@ -45,17 +45,20 @@ import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobBuilder.PayloadBlobBuilder; import org.jclouds.blobstore.domain.BlobMetadata; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.StorageMetadata; import org.jclouds.blobstore.options.CopyOptions; import org.jclouds.blobstore.options.ListContainerOptions; -import org.jclouds.blobstore.options.PutOptions; import org.jclouds.http.HttpRequest; import org.jclouds.http.HttpResponse; import org.jclouds.io.ContentMetadata; import org.jclouds.io.ContentMetadataBuilder; import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; import org.jclouds.io.payloads.ByteSourcePayload; import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; import org.jclouds.rest.HttpClient; @@ -71,6 +74,7 @@ public final class S3ProxyTest { private URI s3Endpoint; private S3Proxy s3Proxy; private BlobStoreContext context; + private BlobStore blobStore; private BlobStoreContext s3Context; private BlobStore s3BlobStore; private String containerName; @@ -113,7 +117,7 @@ public void setUp() throws Exception { builder.endpoint(endpoint); } context = builder.build(BlobStoreContext.class); - BlobStore blobStore = context.getBlobStore(); + blobStore = context.getBlobStore(); containerName = createRandomContainerName(); blobStore.createContainerInLocation(null, containerName); @@ -371,17 +375,47 @@ public void testUrlSigning() throws Exception { .isEqualTo(HttpServletResponse.SC_OK); } + // TODO: fails for Azure (user metadata not set) + // TODO: fails for local blobstores (jclouds not implemented) + // TODO: fails for GCS (jclouds not implemented) + // TODO: fails for Swift (needs part size) @Test public void testMultipartUpload() throws Exception { String blobName = "blob"; - int minMultipartSize = 32 * 1024 * 1024 + 1; - ByteSource byteSource = ByteSource.wrap(new byte[minMultipartSize]); - Blob blob = s3BlobStore.blobBuilder(blobName) - .payload(byteSource) - .contentLength(byteSource.size()) - .build(); - s3BlobStore.putBlob(containerName, blob, - new PutOptions().multipart(true)); + PayloadBlobBuilder blobBuilder = s3BlobStore.blobBuilder(blobName) + .userMetadata(ImmutableMap.of("key1", "value1", "key2", "value2")) + // TODO: fake payload to add content metadata + .payload(new byte[0]); + // TODO: content metadata + //addContentMetadata(blobBuilder); + Blob blob = blobBuilder.build(); + MultipartUpload mpu = s3BlobStore.initiateMultipartUpload( + containerName, blob.getMetadata()); + + ByteSource byteSource = ByteSource.wrap( + new byte[(int) blobStore.getMinimumMultipartPartSize() + 1]); + ByteSource byteSource1 = byteSource.slice( + 0, blobStore.getMinimumMultipartPartSize()); + ByteSource byteSource2 = byteSource.slice( + blobStore.getMinimumMultipartPartSize(), 1); + Payload payload1 = Payloads.newByteSourcePayload(byteSource1); + Payload payload2 = Payloads.newByteSourcePayload(byteSource2); + payload1.getContentMetadata().setContentLength(byteSource1.size()); + payload2.getContentMetadata().setContentLength(byteSource2.size()); + MultipartPart part1 = s3BlobStore.uploadMultipartPart(mpu, 1, payload1); + MultipartPart part2 = s3BlobStore.uploadMultipartPart(mpu, 2, payload2); + + s3BlobStore.completeMultipartUpload(mpu, ImmutableList.of(part1, + part2)); + + Blob newBlob = s3BlobStore.getBlob(containerName, blobName); + try (InputStream expected = newBlob.getPayload().openStream(); + InputStream actual = byteSource.openStream()) { + assertThat(expected).hasContentEqualTo(actual); + } + //checkContentMetadata(newBlob); + assertThat(newBlob.getMetadata().getUserMetadata()).isEqualTo( + blob.getMetadata().getUserMetadata()); } @Test