diff --git a/src/main/java/org/gaul/s3proxy/S3ErrorCode.java b/src/main/java/org/gaul/s3proxy/S3ErrorCode.java index 1ca52640..cea11016 100644 --- a/src/main/java/org/gaul/s3proxy/S3ErrorCode.java +++ b/src/main/java/org/gaul/s3proxy/S3ErrorCode.java @@ -37,6 +37,10 @@ enum S3ErrorCode { "Your previous request to create the named bucket" + " succeeded and you already own it."), BUCKET_NOT_EMPTY(HttpServletResponse.SC_CONFLICT, "Conflict"), + ENTITY_TOO_SMALL(HttpServletResponse.SC_BAD_REQUEST, + "Your proposed upload is smaller than the minimum allowed object" + + " size. Each part must be at least 5 MB in size, except the last" + + " part."), INVALID_ACCESS_KEY_ID(HttpServletResponse.SC_FORBIDDEN, "Forbidden"), INVALID_ARGUMENT(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"), INVALID_BUCKET_NAME(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"), @@ -52,6 +56,7 @@ enum S3ErrorCode { "Length Required"), NO_SUCH_BUCKET(HttpServletResponse.SC_NOT_FOUND, "Not Found"), NO_SUCH_KEY(HttpServletResponse.SC_NOT_FOUND, "Not Found"), + NO_SUCH_UPLOAD(HttpServletResponse.SC_NOT_FOUND, "Not Found"), REQUEST_TIME_TOO_SKEWED(HttpServletResponse.SC_FORBIDDEN, "Forbidden"), REQUEST_TIMEOUT(HttpServletResponse.SC_BAD_REQUEST, "Bad Request"), SIGNATURE_DOES_NOT_MATCH(HttpServletResponse.SC_FORBIDDEN, "Forbidden"); diff --git a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java index f317ad7a..5cdf63f8 100644 --- a/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java +++ b/src/main/java/org/gaul/s3proxy/S3ProxyHandler.java @@ -33,10 +33,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -61,7 +63,9 @@ import com.google.common.collect.TreeMultimap; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; +import com.google.common.hash.HashingInputStream; import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.net.HostAndPort; import com.google.common.net.HttpHeaders; @@ -102,7 +106,16 @@ final class S3ProxyHandler extends AbstractHandler { "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"; private static final String FAKE_OWNER_DISPLAY_NAME = "CustomersName@amazon.com"; + private static final String FAKE_INITIATOR_ID = + "arn:aws:iam::111122223333:" + + "user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx"; + private static final String FAKE_INITIATOR_DISPLAY_NAME = + "umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx"; private static final String FAKE_REQUEST_ID = "4442587FB7D0A2F9"; + private static final String FAKE_UPLOAD_ID = + "EXAMPLEJZ6e0YupT2h66iePQCc9IEbYbDUy4RTpMeoSMLPRp8Z5o1u8feSRo" + + "npvnWsKKG35tI2LB9VDPiCgTy.Gq2VxQLYjrue4Nq.NBdqI-"; + private static final long MINIMUM_MULTIPART_PART_SIZE = 5 * 1024 * 1024; private static final Pattern VALID_BUCKET_PATTERN = Pattern.compile("[a-zA-Z0-9._-]+"); private static final Set SIGNED_SUBRESOURCES = ImmutableSet.of( @@ -119,8 +132,11 @@ final class S3ProxyHandler extends AbstractHandler { "location", "marker", "max-keys", + "partNumber", "prefix", - "Signature" + "Signature", + "uploadId", + "uploads" ); private static final Set CANNED_ACLS = ImmutableSet.of( "private", @@ -312,12 +328,18 @@ public void handle(String target, Request baseRequest, for (int i = 0; i < path.length; i++) { path[i] = URLDecoder.decode(path[i], "UTF-8"); } + String uploadId = request.getParameter("uploadId"); switch (method) { case "DELETE": if (path.length <= 2 || path[2].isEmpty()) { handleContainerDelete(response, path[1]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleAbortMultipartUpload(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } else { handleBlobRemove(response, path[1], path[2]); baseRequest.setHandled(true); @@ -337,6 +359,10 @@ public void handle(String target, Request baseRequest, handleContainerLocation(response, path[1]); baseRequest.setHandled(true); return; + } else if ("".equals(request.getParameter("uploads"))) { + handleListMultipartUploads(response, uploadId); + baseRequest.setHandled(true); + return; } handleBlobList(request, response, path[1]); baseRequest.setHandled(true); @@ -346,6 +372,11 @@ public void handle(String target, Request baseRequest, handleGetBlobAcl(response, path[1], path[2]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleListParts(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } handleGetBlob(request, response, path[1], path[2]); baseRequest.setHandled(true); @@ -366,6 +397,16 @@ public void handle(String target, Request baseRequest, handleMultiBlobRemove(request, response, path[1]); baseRequest.setHandled(true); return; + } else if ("".equals(request.getParameter("uploads"))) { + handleInitiateMultipartUpload(request, response, path[1], + path[2]); + baseRequest.setHandled(true); + return; + } else if (uploadId != null) { + handleCompleteMultipartUpload(request, response, path[1], + path[2], uploadId); + baseRequest.setHandled(true); + return; } break; case "PUT": @@ -378,6 +419,11 @@ public void handle(String target, Request baseRequest, handleContainerCreate(request, response, path[1]); baseRequest.setHandled(true); return; + } else if (uploadId != null) { + handleUploadPart(request, response, path[1], path[2], + uploadId); + baseRequest.setHandled(true); + return; } else if (request.getHeader("x-amz-copy-source") != null) { handleCopyBlob(request, response, path[1], path[2]); baseRequest.setHandled(true); @@ -685,6 +731,12 @@ private void handleContainerLocation(HttpServletResponse response, } } + private void handleListMultipartUploads(HttpServletResponse response, + String uploadId) throws IOException { + // TODO: list all blobs starting with uploadId + response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED); + } + private void handleContainerExists(HttpServletResponse response, String containerName) throws IOException { if (!blobStore.containerExists(containerName)) { @@ -1249,6 +1301,330 @@ private void handlePutBlob(HttpServletRequest request, } } + private void handleInitiateMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName) throws IOException { + String uploadId = FAKE_UPLOAD_ID + UUID.randomUUID().toString(); + ByteSource payload = ByteSource.empty(); + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(uploadId) + .payload(payload); + addContentMetdataFromHttpRequest(builder, request); + builder.contentLength(payload.size()); + blobStore.putBlob(containerName, builder.build()); + + try (Writer writer = response.getWriter()) { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("InitiateMultipartUploadResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("UploadId"); + xml.writeCharacters(uploadId); + xml.writeEndElement(); + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleCompleteMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + try (InputStream is = request.getInputStream(); + Writer writer = response.getWriter()) { + Collection partNames = new ArrayList<>(); + long totalContentLength = 0; + for (Iterator it = parseSimpleXmlElements(is, + "PartNumber").iterator(); it.hasNext();) { + String partName = uploadId + "." + it.next(); + partNames.add(partName); + BlobMetadata metadata = blobStore.blobMetadata(containerName, + partName); + long contentLength = + metadata.getContentMetadata().getContentLength(); + if (contentLength < MINIMUM_MULTIPART_PART_SIZE && + it.hasNext()) { + sendSimpleErrorResponse(response, + S3ErrorCode.ENTITY_TOO_SMALL); + return; + } + totalContentLength += contentLength; + } + + BlobMetadata blobMetadata = blobStore.blobMetadata( + containerName, uploadId); + ContentMetadata contentMetadata = + blobMetadata.getContentMetadata(); + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(blobName) + .userMetadata(blobMetadata.getUserMetadata()) + .payload(new MultiBlobByteSource(blobStore, containerName, + partNames)) + .contentDisposition( + contentMetadata.getContentDisposition()) + .contentEncoding(contentMetadata.getContentEncoding()) + .contentLanguage(contentMetadata.getContentLanguage()) + .contentLength(totalContentLength) + .expires(contentMetadata.getExpires()); + String contentType = contentMetadata.getContentType(); + if (contentType != null) { + builder.contentType(contentType); + } + + // TODO: will the client time out here? + blobStore.putBlob(containerName, builder.build(), + new PutOptions().multipart(true)); + + blobStore.removeBlobs(containerName, partNames); + + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("CompleteMultipartUploadResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Location"); + // TODO: bogus value + xml.writeCharacters("http://Example-Bucket.s3.amazonaws.com/" + + blobName); + xml.writeEndElement(); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("ETag"); + // TODO: bogus value + xml.writeCharacters("\"3858f62230ac3c915f300c664312c11f-9\""); + xml.writeEndElement(); + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleAbortMultipartUpload(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + if (!blobStore.blobExists(containerName, uploadId)) { + sendSimpleErrorResponse(response, S3ErrorCode.NO_SUCH_UPLOAD); + return; + } + PageSet pageSet = blobStore.list( + containerName, + new ListContainerOptions().afterMarker(uploadId)); + for (StorageMetadata sm : pageSet) { + String partName = sm.getName(); + if (!partName.startsWith(uploadId + ".")) { + break; + } + blobStore.removeBlob(containerName, partName); + } + blobStore.removeBlob(containerName, uploadId); + response.sendError(HttpServletResponse.SC_NO_CONTENT); + } + + private void handleListParts(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + try (Writer writer = response.getWriter()) { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter( + writer); + xml.writeStartDocument(); + xml.writeStartElement("ListPartsResult"); + xml.writeDefaultNamespace(AWS_XMLNS); + + xml.writeStartElement("Bucket"); + xml.writeCharacters(containerName); + xml.writeEndElement(); + + xml.writeStartElement("Key"); + xml.writeCharacters(blobName); + xml.writeEndElement(); + + xml.writeStartElement("UploadId"); + xml.writeCharacters(uploadId); + xml.writeEndElement(); + + // TODO: bogus values + xml.writeStartElement("Initiator"); + + xml.writeStartElement("ID"); + xml.writeCharacters(FAKE_INITIATOR_ID); + xml.writeEndElement(); + + xml.writeStartElement("DisplayName"); + xml.writeCharacters(FAKE_INITIATOR_DISPLAY_NAME); + xml.writeEndElement(); + + xml.writeEndElement(); + + xml.writeStartElement("Owner"); + + xml.writeStartElement("ID"); + xml.writeCharacters(FAKE_OWNER_ID); + xml.writeEndElement(); + + xml.writeStartElement("DisplayName"); + xml.writeCharacters(FAKE_OWNER_DISPLAY_NAME); + xml.writeEndElement(); + + xml.writeEndElement(); + + xml.writeStartElement("StorageClass"); + xml.writeCharacters("STANDARD"); + xml.writeEndElement(); + + // TODO: pagination +/* + xml.writeStartElement("PartNumberMarker"); + xml.writeCharacters("1"); + xml.writeEndElement(); + + xml.writeStartElement("NextPartNumberMarker"); + xml.writeCharacters("3"); + xml.writeEndElement(); + + xml.writeStartElement("MaxParts"); + xml.writeCharacters("2"); + xml.writeEndElement(); + + xml.writeStartElement("IsTruncated"); + xml.writeCharacters("true"); + xml.writeEndElement(); +*/ + + PageSet pageSet = blobStore.list( + containerName, + new ListContainerOptions().afterMarker(uploadId)); + for (StorageMetadata sm : pageSet) { + String partName = sm.getName(); + if (!partName.startsWith(uploadId + ".")) { + break; + } + + BlobMetadata metadata = blobStore.blobMetadata(containerName, + partName); + xml.writeStartElement("Part"); + + xml.writeStartElement("PartNumber"); + xml.writeCharacters(partName.substring( + (uploadId + ".").length())); + xml.writeEndElement(); + + // TODO: bogus values + xml.writeStartElement("LastModified"); + xml.writeCharacters("2010-11-10T20:48:33.000Z"); + xml.writeEndElement(); + + xml.writeStartElement("ETag"); + xml.writeCharacters("\"aaaa18db4cc2f85cedef654fccc4a4x8\""); + xml.writeEndElement(); + + xml.writeStartElement("Size"); + xml.writeCharacters(String.valueOf( + metadata.getContentMetadata().getContentLength())); + xml.writeEndElement(); + + xml.writeEndElement(); + } + + xml.writeEndElement(); + xml.flush(); + } catch (XMLStreamException xse) { + throw new IOException(xse); + } + } + + private void handleUploadPart(HttpServletRequest request, + HttpServletResponse response, String containerName, + String blobName, String uploadId) throws IOException { + // TODO: duplicated from handlePutBlob + String contentLengthString = null; + String contentMD5String = null; + for (String headerName : Collections.list(request.getHeaderNames())) { + String headerValue = Strings.nullToEmpty(request.getHeader( + headerName)); + if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { + contentLengthString = headerValue; + } else if (headerName.equalsIgnoreCase(HttpHeaders.CONTENT_MD5)) { + contentMD5String = headerValue; + } + } + + HashCode contentMD5 = null; + if (contentMD5String != null) { + try { + contentMD5 = HashCode.fromBytes( + BaseEncoding.base64().decode(contentMD5String)); + } catch (IllegalArgumentException iae) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST); + return; + } + if (contentMD5.bits() != Hashing.md5().bits()) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_DIGEST); + return; + } + } + + if (contentLengthString == null) { + sendSimpleErrorResponse(response, + S3ErrorCode.MISSING_CONTENT_LENGTH); + return; + } + long contentLength; + try { + contentLength = Long.parseLong(contentLengthString); + } catch (NumberFormatException nfe) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT); + return; + } + if (contentLength < 0) { + sendSimpleErrorResponse(response, S3ErrorCode.INVALID_ARGUMENT); + return; + } + + String partNumber = request.getParameter("partNumber"); + // TODO: sanity checking + + try (HashingInputStream his = new HashingInputStream(Hashing.md5(), + request.getInputStream())) { + BlobBuilder.PayloadBlobBuilder builder = blobStore + .blobBuilder(uploadId + "." + partNumber) + .payload(his); + addContentMetdataFromHttpRequest(builder, request); + if (contentMD5 != null) { + builder = builder.contentMD5(contentMD5); + } + + blobStore.putBlob(containerName, builder.build()); + + // recalculate ETag since some object stores like Azure return + // non-hash + byte[] hashCode = his.hash().asBytes(); + response.addHeader(HttpHeaders.ETAG, "\"" + + BaseEncoding.base16().lowerCase().encode(hashCode) + "\""); + } + } + private static void addMetadataToResponse(HttpServletResponse response, BlobMetadata metadata) { ContentMetadata contentMetadata = @@ -1474,4 +1850,70 @@ private static void addContentMetdataFromHttpRequest( builder.expires(new Date(expires)); } } + + static final class MultiBlobByteSource extends ByteSource { + private final BlobStore blobStore; + private final String containerName; + private final Collection blobNames; + + MultiBlobByteSource(BlobStore blobStore, String containerName, + Collection blobNames) { + this.blobStore = checkNotNull(blobStore); + this.containerName = checkNotNull(containerName); + this.blobNames = checkNotNull(blobNames); + } + + @Override + public InputStream openStream() throws IOException { + return new MultiBlobInputStream(blobStore, containerName, + blobNames); + } + } + + static final class MultiBlobInputStream extends InputStream { + private final BlobStore blobStore; + private final String containerName; + private final Iterator blobNames; + private InputStream is; + + MultiBlobInputStream(BlobStore blobStore, String containerName, + Collection blobNames) throws IOException { + this.blobStore = checkNotNull(blobStore); + this.containerName = checkNotNull(containerName); + this.blobNames = blobNames.iterator(); + resetInputStream(); + } + + @Override + public int read() throws IOException { + int ch = is.read(); + if (ch != -1) { + return ch; + } else if (blobNames.hasNext()) { + resetInputStream(); + return is.read(); + } else { + return -1; + } + } + + @Override + public int read(byte[] array, int offset, int length) + throws IOException { + int ch = is.read(array, offset, length); + if (ch != -1) { + return ch; + } else if (blobNames.hasNext()) { + resetInputStream(); + return is.read(array, offset, length); + } else { + return -1; + } + } + + private void resetInputStream() throws IOException { + Blob blob = blobStore.getBlob(containerName, blobNames.next()); + is = blob.getPayload().openStream(); + } + } } diff --git a/src/test/java/org/gaul/s3proxy/S3ProxyTest.java b/src/test/java/org/gaul/s3proxy/S3ProxyTest.java index 82e2b41b..17833702 100644 --- a/src/test/java/org/gaul/s3proxy/S3ProxyTest.java +++ b/src/test/java/org/gaul/s3proxy/S3ProxyTest.java @@ -52,6 +52,7 @@ import org.jclouds.io.payloads.ByteSourcePayload; import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; import org.jclouds.rest.HttpClient; +import org.jclouds.s3.S3Client; import org.jclouds.util.Throwables2; import org.junit.After; import org.junit.Before; @@ -376,7 +377,7 @@ public void testUrlSigning() throws Exception { } @Test - public void testUnknownParameter() throws Exception { + public void testMultipartUpload() throws Exception { String blobName = "blob"; int minMultipartSize = 32 * 1024 * 1024 + 1; ByteSource byteSource = ByteSource.wrap(new byte[minMultipartSize]); @@ -385,8 +386,14 @@ public void testUnknownParameter() throws Exception { .contentLength(byteSource.size()) .build(); PutOptions options = new PutOptions().multipart(true); + s3BlobStore.putBlob(containerName, blob, options); + } + + @Test + public void testUnknownParameter() throws Exception { + S3Client s3Client = s3Context.unwrapApi(S3Client.class); try { - s3BlobStore.putBlob(containerName, blob, options); + s3Client.disableBucketLogging(containerName); fail("Expected HttpResponseException"); } catch (RuntimeException re) { // TODO: why does jclouds wrap this in a