Skip to content

Commit

Permalink
Add API to compose objects through server-side copying
Browse files Browse the repository at this point in the history
The new ComposeObject API provides a way to create objects by
concatenating existing objects. It takes a list of source objects
along with optional start-end range specifications, and concatenates
them into a new object.

The API supports:

* Create an object from upto 10000 existing objects.
* Create objects upto 5TiB in size, from source objects of any size.
* Support copy-conditions on each source object separately.
* Support SSE-C (i.e. Server-Side-Encryption with Customer provided
  key) for both encryption of destination object, and decryption of
  source objects.
* Support for setting/replacing custom metadata in the destination
  object.

This API has been used to refactor the CopyObject API - that API now
supports source objects of any size, SSE-C for source and destination,
and settings custom metadata.
  • Loading branch information
donatello committed Jun 21, 2017
1 parent 4dde80b commit daa0d07
Show file tree
Hide file tree
Showing 9 changed files with 697 additions and 84 deletions.
399 changes: 399 additions & 0 deletions api-compose-object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,399 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package minio

import (
"crypto/md5"
"encoding/base64"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/minio/minio-go/pkg/s3utils"
)

// SSEInfo - represents Server-Side-Encryption parameters specified by
// a user.
type SSEInfo struct {
key []byte
algo string
}

// NewSSEInfo - specifies (binary or un-encoded) encryption key and
// algorithm name. If algo is empty, it defaults to "AES256".
func NewSSEInfo(key []byte, algo string) SSEInfo {
if algo == "" {
algo = "AES256"
}
return SSEInfo{key, algo}
}

// internal method to output the generated SSE headers.
func (e *SSEInfo) getSSEHeaders(isCopySource bool) map[string]string {
if e == nil {
return nil
}

cs := ""
if isCopySource {
cs = "copy-source-"
}
hsum := md5.Sum(e.key)
return map[string]string{
"x-amz-" + cs + "server-side-encryption-customer-algorithm": e.algo,
"x-amz-" + cs + "server-side-encryption-customer-key": base64.StdEncoding.EncodeToString(e.key),
"x-amz-" + cs + "server-side-encryption-customer-key-MD5": base64.StdEncoding.EncodeToString(hsum[:]),
}
}

// DestinationInfo - type with information about the object to be
// created via server-side copy requests, using the Compose API.
type DestinationInfo struct {
bucket, object string
encryption *SSEInfo
customMetadata map[string]string
}

// NewDestinationInfo - creates a DesinationInfo instance -
// encryptionInfo may be nil to indicate that no-encryption is to be
// done, and metadata may be nil to indicate that source metadata is
// to be copied to the destination object from the source.
func NewDestinationInfo(bucket, object string, encryptionInfo *SSEInfo,
metadata map[string]string) DestinationInfo {

return DestinationInfo{
bucket: bucket,
object: object,
encryption: encryptionInfo,
customMetadata: metadata,
}
}

// getCustomHeadersMap - construct appropriate key-value pairs to send
// as headers from metadata map to pass into copy-object request. For
// single part copy-object (i.e. non-multipart object), enable the
// withCopyDirectiveHeader to set the `x-amz-metadata-directive` to
// `REPLACE`, so that metadata headers from the source are not copied
// over.
func (d *DestinationInfo) getCustomHeadersMap(withCopyDirectiveHeader bool) map[string]string {
if d == nil || d.customMetadata == nil {
return nil
}
r := make(map[string]string)
if withCopyDirectiveHeader {
r["x-amz-metadata-directive"] = "REPLACE"
}
for k, v := range d.customMetadata {
if !strings.HasPrefix(k, "x-amz-meta-") {
k = "x-amz-meta-" + k
}
r[k] = v
}
return r
}

// SourceInfo - represents a source object to be copied, using
// server-side copying APIs.
type SourceInfo struct {
bucket, object string

start, end int64

// collection of headers to send with every upload-part-copy
// request involving this source object.
Headers http.Header
}

// NewSourceInfo - create a new SourceInfo object. decryptionInfo may
// be nil if the source object is not encrypted.
func NewSourceInfo(bucket, object string, decryptionInfo *SSEInfo) SourceInfo {
r := SourceInfo{
bucket: bucket,
object: object,
start: -1,
Headers: make(http.Header),
}

// Set the source header
r.Headers.Set("x-amz-copy-source", s3utils.EncodePath(fmt.Sprintf("/%s/%s", bucket, object)))

// Add decryption headers for source
for k, v := range decryptionInfo.getSSEHeaders(true) {
r.Headers.Set(k, v)
}

return r
}

// SetRange - Set the start and end offset of the source object to be
// copied. If this method is not called, the whole source object is
// copied.
func (s *SourceInfo) SetRange(start, end int64) error {
if start > end || start < 0 {
return ErrInvalidArgument("start must be non-negative, and start must be at most end.")
}
s.start, s.end = start, end
return nil
}

// SetMatchETagCond - Set ETag match condition. The object is copied
// only if the etag of the source matches the value given here.
func (s *SourceInfo) SetMatchETagCond(etag string) error {
if etag == "" {
return ErrInvalidArgument("ETag cannot be empty.")
}
s.Headers.Set("x-amz-copy-source-if-match", etag)
return nil
}

// SetMatchETagExceptCond - Set the ETag match exception
// condition. The object is copied only if the etag of the source is
// not the value given here.
func (s *SourceInfo) SetMatchETagExceptCond(etag string) error {
if etag == "" {
return ErrInvalidArgument("ETag cannot be empty.")
}
s.Headers.Set("x-amz-copy-source-if-none-match", etag)
return nil
}

// SetModifiedSinceCond - Set the modified since condition.
func (s *SourceInfo) SetModifiedSinceCond(modTime time.Time) error {
if modTime.IsZero() {
return ErrInvalidArgument("Input time cannot be 0.")
}
s.Headers.Set("x-amz-copy-source-if-modified-since", modTime.Format(http.TimeFormat))
return nil
}

// SetUnmodifiedSinceCond - Set the unmodified since condition.
func (s *SourceInfo) SetUnmodifiedSinceCond(modTime time.Time) error {
if modTime.IsZero() {
return ErrInvalidArgument("Input time cannot be 0.")
}
s.Headers.Set("x-amz-copy-source-if-unmodified-since", modTime.Format(http.TimeFormat))
return nil
}

// uploadPartCopy - helper function to perform a upload-part-copy
// request.
func (c Client) uploadPartCopy(bucket, object, uploadID string, partNumber int,
headers http.Header) (p CompletePart, err error) {

// Build query parameters
urlValues := make(url.Values)
urlValues.Set("partNumber", strconv.Itoa(partNumber))
urlValues.Set("uploadId", uploadID)

// Send upload-part-copy request
resp, err := c.executeMethod("PUT", requestMetadata{
bucketName: bucket,
objectName: object,
customHeader: headers,
queryValues: urlValues,
})
defer closeResponse(resp)
if err != nil {
return p, err
}

// Check if we got an error response.
if resp.StatusCode != http.StatusOK {
return p, httpRespToErrorResponse(resp, bucket, object)
}

// Decode copy-part response on success.
cpObjRes := copyObjectResult{}
err = xmlDecoder(resp.Body, &cpObjRes)
if err != nil {
return p, err
}
p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
return p, nil
}

func (c Client) getSourceProps(bucket, object string) (size int64, etag string, err error) {
// Get object info - need size and etag here.
var objInfo ObjectInfo
objInfo, err = c.StatObject(bucket, object)
if err != nil {
err = fmt.Errorf("Could not stat object - %s/%s: %v", bucket, object, err)
} else {
size = objInfo.Size
etag = objInfo.ETag
}
return
}

// ComposeObject - creates an object using server-side copying of
// existing objects. It takes a list of source objects (with optional
// offsets) and concatenates them into a new object using only
// server-side copying operations.
func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
// Input validation.
if err := s3utils.CheckValidBucketName(dst.bucket); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(dst.object); err != nil {
return err
}
if len(srcs) > maxPartsCount {
return ErrInvalidArgument("Cannot copy more than 10000 source objects.")
}

// Query and collect source size, and construct
// upload-part-copy arguments.
type PartInfo struct {
start, end int64
idx int
}
var partInfo []PartInfo
var totalSize int64
for i, src := range srcs {
size, etag, err := c.getSourceProps(src.bucket, src.object)
if err != nil {
return fmt.Errorf("Could not get source props for %s/%s: %v", src.bucket, src.object, err)
}

// Since we did a HEAD to get size, we use the ETag
// value to make sure the object has not changed by
// the time we perform the copy. This is done, only if
// the user has not set their own ETag match
// condition.
if src.Headers.Get("x-amz-copy-source-if-match") == "" {
src.SetMatchETagCond(etag)
}

// Check if a segment is specified, and if so, is the
// segment within object bounds?
if src.start != -1 {
if src.end >= size {
return ErrInvalidArgument(
fmt.Sprintf("SourceInfo %d has invalid segment-to-copy [%d, %d] (size is %d)",
i, src.start, src.end, size))
}
}

// Is data to copy too large?
totalSize += size
if totalSize > maxMultipartPutObjectSize {
return ErrInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
}

// Append copy-part requests info for the current
// source object.
var currOffset, partSize int64
for currOffset < size {
if currOffset+copyPartSize < size {
partSize = copyPartSize
} else {
partSize = size - currOffset
}
offsetEnd := currOffset + partSize - 1
partInfo = append(partInfo, PartInfo{currOffset, offsetEnd, i})
if len(partInfo) > maxPartsCount {
return ErrInvalidArgument(fmt.Sprintf("Cannot compose an object with more than 10000 parts."))
}
currOffset += partSize
}
}

// Single source object case (i.e. when only source is
// involved, and it is being copied wholly).
if len(partInfo) == 1 && srcs[0].start == -1 {
h := srcs[0].Headers
// Add destination encryption headers
for k, v := range dst.encryption.getSSEHeaders(false) {
h.Set(k, v)
}
// Add custom metadata headers
for k, v := range dst.getCustomHeadersMap(true) {
h.Set(k, v)
}
// Send copy request
resp, err := c.executeMethod("PUT", requestMetadata{
bucketName: dst.bucket,
objectName: dst.object,
customHeader: h,
})
defer closeResponse(resp)
if err != nil {
return err
}
// Check if we got an error response.
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp, dst.bucket, dst.object)
}
// Decode copy response on success.
cpObjRes := copyObjectResult{}
err = xmlDecoder(resp.Body, &cpObjRes)
if err != nil {
return fmt.Errorf("Copy-object error: %v", err)
}

// Return nil on success.
return nil
}

// Now, handle multipart-copy cases.

// Initiate a new multipart upload.
metaHeaders := make(map[string][]string)
for k, v := range dst.getCustomHeadersMap(false) {
metaHeaders[k] = []string{v}
}
uid, err := c.newUploadID(dst.bucket, dst.object, metaHeaders)
if err != nil {
return fmt.Errorf("Error creating new upload: %v", err)
}

// perform copy part uploads
objParts := []CompletePart{}
for i, p := range partInfo {
h := srcs[i].Headers
// Add destination encryption headers
for k, v := range dst.encryption.getSSEHeaders(false) {
h.Set(k, v)
}
// Add custom metadata headers
for k, v := range dst.getCustomHeadersMap(true) {
h.Set(k, v)
}
// Set range header, if specified.
if p.start != -1 {
h.Set("x-amz-copy-source-range",
fmt.Sprintf("bytes=%d-%d", p.start, p.end))
}
complPart, err := c.uploadPartCopy(dst.bucket, dst.object, uid, i+1, h)
if err != nil {
return fmt.Errorf("Error in upload-part-copy - %v", err)
}

objParts = append(objParts, complPart)
}

// Make final complete-multipart request.
_, err = c.completeMultipartUpload(dst.bucket, dst.object, uid,
completeMultipartUpload{Parts: objParts})
if err != nil {
err = fmt.Errorf("Error in complete-multipart request - %v", err)
}
return err
}
Loading

0 comments on commit daa0d07

Please sign in to comment.