From c82825a00c0da2536ebe46a204f2437c98088863 Mon Sep 17 00:00:00 2001 From: Andrew Gaul Date: Sun, 21 Dec 2014 16:59:02 -0800 Subject: [PATCH] Emulate multipart upload with single-part uploads This approach requires three times as many operations as the optimal approach. Implementing this correctly requires exposing the underlying multipart operations in jclouds. Most s3-tests pass but two still fail: test_multipart_upload_size_too_small test_list_multipart_upload References #2. --- README.md | 3 +- .../java/org/gaul/s3proxy/S3ErrorCode.java | 1 + .../java/org/gaul/s3proxy/S3ProxyHandler.java | 357 ++++++++++++++++++ 3 files changed, 359 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6aeaac74..42ccc025 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Features -------- * create, remove, and list buckets (including user-specified regions) * put, get, delete, and list objects -* copy objects and delete multiple objects (emulated operations) +* copy objects, delete multiple objects, and multi-part uploads (emulated operations) * store and retrieve object metadata, including user metadata * authorization via AWS signature v2 (including pre-signed URLs) or anonymous access * listen on HTTP or HTTPS @@ -95,7 +95,6 @@ Limitations S3Proxy does not support: * single-part uploads larger than 2 GB ([jclouds issue](https://issues.apache.org/jira/browse/JCLOUDS-264)) -* multi-part uploads * POST uploads * bucket ACLs ([jclouds issue](https://issues.apache.org/jira/browse/JCLOUDS-660)) * object ACLS ([jclouds issue](https://issues.apache.org/jira/browse/JCLOUDS-732)) diff --git a/src/main/java/org/gaul/s3proxy/S3ErrorCode.java b/src/main/java/org/gaul/s3proxy/S3ErrorCode.java index 710ed695..e17fa813 100644 --- a/src/main/java/org/gaul/s3proxy/S3ErrorCode.java +++ b/src/main/java/org/gaul/s3proxy/S3ErrorCode.java @@ -51,6 +51,7 @@ enum S3ErrorCode { "Length Required"), NO_SUCH_BUCKET(HttpServletResponse.SC_NOT_FOUND, "Not Found"), NO_SUCH_KEY(HttpServletResponse.SC_NOT_FOUND, "Not Found"), + NO_SUCH_UPLOAD(HttpServletResponse.SC_NOT_FOUND, "Not Found"), REQUEST_TIME_TOO_SKEWED(HttpServletResponse.SC_FORBIDDEN, "Forbidden"), REQUEST_TIMEOUT(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"), SIGNATURE_DOES_NOT_MATCH(HttpServletResponse.SC_FORBIDDEN, "Forbidden"); diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java index 2abe0448..b917d1d5 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PushbackInputStream; +import java.io.SequenceInputStream; import java.io.Writer; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -53,6 +55,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.SortedSetMultimap; @@ -60,7 +63,9 @@ import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; +import com.google.common.io.Closer; import com.google.common.net.HostAndPort; import com.google.common.net.HttpHeaders; @@ -98,7 +103,15 @@ final class S3ProxyHandler extends AbstractHandler { "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"; private static final String FAKE_OWNER_DISPLAY_NAME = "CustomersName@amazon.com"; + private static final String FAKE_INITIATOR_ID = + "arn:aws:iam::111122223333:" + + "user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx"; + private static final String FAKE_INITIATOR_DISPLAY_NAME = + "umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx"; private static final String FAKE_REQUEST_ID = "4442587FB7D0A2F9"; + private static final String FAKE_UPLOAD_ID = + "EXAMPLEJZ6e0YupT2h66iePQCc9IEbYbDUy4RTpMeoSMLPRp8Z5o1u8feSRo" + + "npvnWsKKG35tI2LB9VDPiCgTy.Gq2VxQLYjrue4Nq.NBdqI-"; private static final Pattern VALID_BUCKET_PATTERN = Pattern.compile("[a-zA-Z0-9._-]+"); private static final Set SIGNED_SUBRESOURCES = ImmutableSet.of( @@ -272,12 +285,18 @@ public void handle(String target, Request baseRequest, for (int i = 0; i < path.length; i++) { path[i] = URLDecoder.decode(path[i], "UTF-8"); } + String uploadId = request.getParameter("uploadId"); switch (method) { case "DELETE": if (path.length <= 2 || path[2].isEmpty()) { handleContainerDelete(response, path[1]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleAbortMultipartUpload(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } else { handleBlobRemove(response, path[1], path[2]); baseRequest.setHandled(true); @@ -297,6 +316,10 @@ public void handle(String target, Request baseRequest, handleContainerLocation(response, path[1]); baseRequest.setHandled(true); return; + } else if ("".equals(request.getParameter("uploads"))) { + handleListMultipartUploads(response); + baseRequest.setHandled(true); + return; } handleBlobList(request, response, path[1]); baseRequest.setHandled(true); @@ -306,6 +329,11 @@ public void handle(String target, Request baseRequest, handleContainerOrBlobAcl(response, path[1], path[2]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleListParts(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } handleGetBlob(request, response, path[1], path[2]); baseRequest.setHandled(true); @@ -326,6 +354,16 @@ public void handle(String target, Request baseRequest, handleMultiBlobRemove(request, response, path[1]); baseRequest.setHandled(true); return; + } else if ("".equals(request.getParameter("uploads"))) { + handleInitiateMultipartUpload(request, response, path[1], + path[2]); + baseRequest.setHandled(true); + return; + } else if (uploadId != null) { + handleCompleteMultipartUpload(request, response, path[1], + path[2], uploadId); + baseRequest.setHandled(true); + return; } break; case "PUT": @@ -338,6 +376,11 @@ public void handle(String target, Request baseRequest, handleContainerCreate(request, response, path[1]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleUploadPart(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } else if (request.getHeader("x-amz-copy-source") != null) { handleCopyBlob(request, response, path[1], path[2]); baseRequest.setHandled(true); @@ -477,6 +520,12 @@ private void handleContainerLocation(HttpServletResponse response, } } + private void handleListMultipartUploads(HttpServletResponse response) + throws IOException { + // TODO: list all blobs starting with FAKE_UPLOAD_ID + response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED); + } + private void handleContainerExists(HttpServletResponse response, String containerName) throws IOException { if (!blobStore.containerExists(containerName)) { @@ -1022,6 +1071,314 @@ private void handlePutBlob(HttpServletRequest request, } } + private void handleInitiateMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName) throws IOException { + String uploadId = FAKE_UPLOAD_ID + UUID.randomUUID().toString(); + ByteSource payload = ByteSource.empty(); + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(uploadId) + .payload(payload); + addContentMetdataFromHttpRequest(builder, request); + builder.contentLength(payload.size()); + blobStore.putBlob(containerName, builder.build()); + + try (Writer writer = response.getWriter()) { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("InitiateMultipartUploadResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("UploadId"); + xml.writeCharacters(uploadId); + xml.writeEndElement(); + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleCompleteMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + try (InputStream is = request.getInputStream(); + Closer closer = Closer.create()) { + ImmutableList.Builder iss = ImmutableList.builder(); + long contentLength = 0; + for (String partNumber : parseSimpleXmlElements(is, "PartNumber")) { + Blob blob = blobStore.getBlob(containerName, uploadId + "." + + partNumber); + contentLength += blob.getMetadata().getContentMetadata() + .getContentLength(); + iss.add(closer.register(blob.getPayload().openStream())); + } + try (InputStream sis = new SequenceInputStream( + Collections.enumeration(iss.build()))) { + BlobMetadata blobMetadata = blobStore.blobMetadata( + containerName, uploadId); + ContentMetadata contentMetadata = + blobMetadata.getContentMetadata(); + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(blobName) + .userMetadata(blobMetadata.getUserMetadata()) + .payload(sis) + .contentDisposition( + contentMetadata.getContentDisposition()) + .contentEncoding(contentMetadata.getContentEncoding()) + .contentLanguage(contentMetadata.getContentLanguage()) + .contentLength(contentLength); + String contentType = contentMetadata.getContentType(); + if (contentType != null) { + builder.contentType(contentType); + } + // TODO: expires? + + blobStore.putBlob(containerName, builder.build(), + new PutOptions().multipart(true)); + blobStore.removeBlob(containerName, uploadId); + } + } + + try (Writer writer = response.getWriter()) { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("CompleteMultipartUploadResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Location"); + // TODO: bogus value + xml.writeCharacters("http://Example-Bucket.s3.amazonaws.com/" + + blobName); + xml.writeEndElement(); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("ETag"); + // TODO: bogus value + xml.writeCharacters("\"3858f62230ac3c915f300c664312c11f-9\""); + xml.writeEndElement(); + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleAbortMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + if (!blobStore.blobExists(containerName, uploadId)) { + sendSimpleErrorResponse(response, S3ErrorCode.NO_SUCH_UPLOAD); + } + PageSet pageSet = blobStore.list( + containerName, + new ListContainerOptions().afterMarker(uploadId)); + for (StorageMetadata sm : pageSet) { + String partName = sm.getName(); + if (!partName.startsWith(uploadId + ".")) { + break; + } + blobStore.removeBlob(containerName, partName); + } + blobStore.removeBlob(containerName, uploadId); + response.sendError(HttpServletResponse.SC_NO_CONTENT); + } + + private void handleListParts(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + try (Writer writer = response.getWriter()) { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("ListPartsResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("UploadId"); + xml.writeCharacters(uploadId); + xml.writeEndElement(); + + // TODO: bogus values + xml.writeStartElement("Initiator"); + + xml.writeStartElement("ID"); + xml.writeCharacters(FAKE_INITIATOR_ID); + xml.writeEndElement(); + + xml.writeStartElement("DisplayName"); + xml.writeCharacters(FAKE_INITIATOR_DISPLAY_NAME); + xml.writeEndElement(); + + xml.writeEndElement(); + + xml.writeStartElement("Owner"); + + xml.writeStartElement("ID"); + xml.writeCharacters(FAKE_OWNER_ID); + xml.writeEndElement(); + + xml.writeStartElement("DisplayName"); + xml.writeCharacters(FAKE_OWNER_DISPLAY_NAME); + xml.writeEndElement(); + + xml.writeEndElement(); + + xml.writeStartElement("StorageClass"); + xml.writeCharacters("STANDARD"); + xml.writeEndElement(); + + // TODO: pagination +/* + xml.writeStartElement("PartNumberMarker"); + xml.writeCharacters("1"); + xml.writeEndElement(); + + xml.writeStartElement("NextPartNumberMarker"); + xml.writeCharacters("3"); + xml.writeEndElement(); + + xml.writeStartElement("MaxParts"); + xml.writeCharacters("2"); + xml.writeEndElement(); + + xml.writeStartElement("IsTruncated"); + xml.writeCharacters("true"); + xml.writeEndElement(); +*/ + + PageSet pageSet = blobStore.list( + containerName, + new ListContainerOptions().afterMarker(uploadId)); + for (StorageMetadata sm : pageSet) { + String partName = sm.getName(); + if (!partName.startsWith(uploadId + ".")) { + break; + } + + BlobMetadata metadata = blobStore.blobMetadata(containerName, + partName); + xml.writeStartElement("Part"); + + xml.writeStartElement("PartNumber"); + xml.writeCharacters(partName.substring( + (uploadId + ".").length())); + xml.writeEndElement(); + + // TODO: bogus values + xml.writeStartElement("LastModified"); + xml.writeCharacters("2010-11-10T20:48:33.000Z"); + xml.writeEndElement(); + + xml.writeStartElement("ETag"); + xml.writeCharacters("\"aaaa18db4cc2f85cedef654fccc4a4x8\""); + xml.writeEndElement(); + + xml.writeStartElement("Size"); + xml.writeCharacters(String.valueOf( + metadata.getContentMetadata().getContentLength())); + xml.writeEndElement(); + + xml.writeEndElement(); + } + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleUploadPart(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + // TODO: duplicated from handlePutBlob + String contentLengthString = null; + String contentMD5String = null; + for (String headerName : Collections.list(request.getHeaderNames())) { + String headerValue = Strings.nullToEmpty(request.getHeader( + headerName)); + if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { + contentLengthString = headerValue; + } else if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_MD5)) { + contentMD5String = headerValue; + } + } + + HashCode contentMD5 = null; + if (contentMD5String != null) { + try { + contentMD5 = HashCode.fromBytes( + BaseEncoding.base64().decode(contentMD5String)); + } catch (IllegalArgumentException iae) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST); + return; + } + if (contentMD5.bits() != Hashing.md5().bits()) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST); + return; + } + } + + if (contentLengthString == null) { + sendSimpleErrorResponse(response, + S3ErrorCode.MISSING_CONTENT_LENGTH); + return; + } + long contentLength; + try { + contentLength = Long.parseLong(contentLengthString); + } catch (NumberFormatException nfe) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT); + return; + } + if (contentLength < 0) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT); + return; + } + + String partNumber = request.getParameter("partNumber"); + // TODO: sanity checking + + try (InputStream is = request.getInputStream()) { + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(uploadId + "." + partNumber) + .payload(is); + addContentMetdataFromHttpRequest(builder, request); + if (contentMD5 != null) { + builder = builder.contentMD5(contentMD5); + } + + blobStore.putBlob(containerName, builder.build()); + } + } + private static void addMetadataToResponse(HttpServletResponse response, BlobMetadata metadata) { ContentMetadata contentMetadata =