-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add large-object support for CopyObject API #644
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,16 @@ | |
package minio | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"strings" | ||
|
||
"github.com/minio/minio-go/pkg/s3utils" | ||
) | ||
|
||
// CopyObject - copy a source object into a new object with the provided name in the provided bucket | ||
// CopyObject - copy a source object into a new object with the | ||
// provided name in the provided bucket | ||
func (c Client) CopyObject(bucketName string, objectName string, objectSource string, cpCond CopyConditions) error { | ||
// Input validation. | ||
if err := isValidBucketName(bucketName); err != nil { | ||
|
@@ -31,19 +35,47 @@ func (c Client) CopyObject(bucketName string, objectName string, objectSource st | |
if err := isValidObjectName(objectName); err != nil { | ||
return err | ||
} | ||
if objectSource == "" { | ||
return ErrInvalidArgument("Object source cannot be empty.") | ||
srcBucket, srcObject, err := getObjectSource(objectSource) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Get info about the source object | ||
srcInfo, err := c.StatObject(srcBucket, srcObject) | ||
if err != nil { | ||
return err | ||
} | ||
srcByteRangeSize := cpCond.getRangeSize() | ||
if srcByteRangeSize > srcInfo.Size || | ||
(srcByteRangeSize > 0 && cpCond.byteRangeEnd >= srcInfo.Size) { | ||
return ErrInvalidArgument(fmt.Sprintf( | ||
"Specified byte range (%d, %d) does not fit within source object (size = %d)", | ||
cpCond.byteRangeStart, cpCond.byteRangeEnd, srcInfo.Size)) | ||
} | ||
|
||
copySize := srcByteRangeSize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment explaining how an empty range is interpreted as the entire object would provide context for future. |
||
if copySize == 0 { | ||
copySize = srcInfo.Size | ||
} | ||
|
||
// customHeaders apply headers. | ||
customHeaders := make(http.Header) | ||
for _, cond := range cpCond.conditions { | ||
customHeaders.Set(cond.key, cond.value) | ||
for key, value := range cpCond.conditions { | ||
customHeaders.Set(key, value) | ||
} | ||
|
||
// Set copy source. | ||
customHeaders.Set("x-amz-copy-source", s3utils.EncodePath(objectSource)) | ||
|
||
// Check if single part copy suffices. Multipart is required when: | ||
// 1. source-range-offset does not refer to full source object, or | ||
// 2. size of copied object > 5gb | ||
if copySize > maxPartSize || | ||
(srcByteRangeSize > 0 && srcByteRangeSize != srcInfo.Size) { | ||
return c.multipartCopyObject(bucketName, objectName, | ||
objectSource, cpCond, customHeaders, copySize) | ||
} | ||
|
||
// Execute PUT on objectName. | ||
resp, err := c.executeMethod("PUT", requestMetadata{ | ||
bucketName: bucketName, | ||
|
@@ -70,3 +102,91 @@ func (c Client) CopyObject(bucketName string, objectName string, objectSource st | |
// Return nil on success. | ||
return nil | ||
} | ||
|
||
func getObjectSource(src string) (bucket string, object string, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment necessary.. and a test case.. |
||
parts := strings.Split(src, "/") | ||
if len(parts) != 3 || parts[0] != "" || parts[1] == "" || parts[2] == "" { | ||
return "", "", ErrInvalidArgument("Object source should be formatted as '/bucketName/objectName'") | ||
} | ||
return parts[1], parts[2], nil | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment necessary.. |
||
func (c Client) multipartCopyObject(bucketName string, objectName string, | ||
objectSource string, cpCond CopyConditions, headers http.Header, | ||
copySize int64) error { | ||
|
||
// Compute split sizes for multipart copy. | ||
partsCount, partSize, lastPartSize, err := optimalPartInfo(copySize) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// It is not possible to resume a multipart copy object | ||
// operation, so we just create new uploadID and proceed with | ||
// the copy operations. | ||
uid, err := c.newUploadID(bucketName, objectSource, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
queryParams := url.Values{} | ||
queryParams.Set("uploadId", uid) | ||
|
||
var complMultipartUpload completeMultipartUpload | ||
|
||
// Initiate copy object operations. | ||
for partNumber := 1; partNumber <= partsCount; partNumber++ { | ||
pCond := cpCond.duplicate() | ||
pCond.byteRangeStart = partSize * (int64(partNumber) - 1) | ||
if partNumber < partsCount { | ||
pCond.byteRangeEnd = pCond.byteRangeStart + partSize - 1 | ||
} else { | ||
pCond.byteRangeEnd = pCond.byteRangeStart + lastPartSize - 1 | ||
} | ||
|
||
// Update the source range header value. | ||
headers.Set("x-amz-copy-source-range", | ||
fmt.Sprintf("bytes:%d-%d", pCond.byteRangeStart, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is wrong btw - it should be |
||
pCond.byteRangeEnd)) | ||
|
||
// Update part number in the query parameters. | ||
queryParams.Set("partNumber", fmt.Sprintf("%d", partNumber)) | ||
|
||
// Perform part-copy. | ||
resp, err := c.executeMethod("PUT", requestMetadata{ | ||
bucketName: bucketName, | ||
objectName: objectName, | ||
customHeader: headers, | ||
queryValues: queryParams, | ||
}) | ||
defer closeResponse(resp) | ||
if err != nil { | ||
return err | ||
} | ||
if resp != nil { | ||
if resp.StatusCode != http.StatusOK { | ||
return httpRespToErrorResponse(resp, bucketName, | ||
objectName) | ||
} | ||
} | ||
|
||
// Decode copy response on success. | ||
cpObjRes := copyObjectResult{} | ||
err = xmlDecoder(resp.Body, &cpObjRes) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// append part info for complete multipart request | ||
complMultipartUpload.Parts = append(complMultipartUpload.Parts, | ||
CompletePart{ | ||
PartNumber: partNumber, | ||
ETag: cpObjRes.ETag, | ||
}) | ||
} | ||
|
||
// Complete the multipart upload. | ||
_, err = c.completeMultipartUpload(bucketName, objectName, uid, | ||
complMultipartUpload) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,44 +21,34 @@ import ( | |
"time" | ||
) | ||
|
||
// copyCondition explanation: | ||
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html | ||
// | ||
// Example: | ||
// | ||
// copyCondition { | ||
// key: "x-amz-copy-if-modified-since", | ||
// value: "Tue, 15 Nov 1994 12:45:26 GMT", | ||
// } | ||
// | ||
type copyCondition struct { | ||
key string | ||
value string | ||
} | ||
|
||
// CopyConditions - copy conditions. | ||
type CopyConditions struct { | ||
conditions []copyCondition | ||
conditions map[string]string | ||
// start and end offset (inclusive) of source object to be | ||
// copied. | ||
byteRangeStart int64 | ||
byteRangeEnd int64 | ||
} | ||
|
||
// NewCopyConditions - Instantiate new list of conditions. This | ||
// function is left behind for backward compatibility. The idiomatic | ||
// way to set an empty set of copy conditions is, | ||
// ``copyConditions := CopyConditions{}``. | ||
// NewCopyConditions - Instantiate new list of conditions. Prefer to | ||
// use this function as it initializes byte-range. | ||
// | ||
func NewCopyConditions() CopyConditions { | ||
return CopyConditions{} | ||
return CopyConditions{ | ||
conditions: make(map[string]string), | ||
// default values for byte-range indicating that they | ||
// are not provided by the user | ||
byteRangeStart: -1, | ||
byteRangeEnd: -1, | ||
} | ||
} | ||
|
||
// SetMatchETag - set match etag. | ||
func (c *CopyConditions) SetMatchETag(etag string) error { | ||
if etag == "" { | ||
return ErrInvalidArgument("ETag cannot be empty.") | ||
} | ||
c.conditions = append(c.conditions, copyCondition{ | ||
key: "x-amz-copy-source-if-match", | ||
value: etag, | ||
}) | ||
c.conditions["x-amz-copy-source-if-match"] = etag | ||
return nil | ||
} | ||
|
||
|
@@ -67,10 +57,7 @@ func (c *CopyConditions) SetMatchETagExcept(etag string) error { | |
if etag == "" { | ||
return ErrInvalidArgument("ETag cannot be empty.") | ||
} | ||
c.conditions = append(c.conditions, copyCondition{ | ||
key: "x-amz-copy-source-if-none-match", | ||
value: etag, | ||
}) | ||
c.conditions["x-amz-copy-source-if-none-match"] = etag | ||
return nil | ||
} | ||
|
||
|
@@ -79,10 +66,7 @@ func (c *CopyConditions) SetUnmodified(modTime time.Time) error { | |
if modTime.IsZero() { | ||
return ErrInvalidArgument("Modified since cannot be empty.") | ||
} | ||
c.conditions = append(c.conditions, copyCondition{ | ||
key: "x-amz-copy-source-if-unmodified-since", | ||
value: modTime.Format(http.TimeFormat), | ||
}) | ||
c.conditions["x-amz-copy-source-if-unmodified-since"] = modTime.Format(http.TimeFormat) | ||
return nil | ||
} | ||
|
||
|
@@ -91,9 +75,37 @@ func (c *CopyConditions) SetModified(modTime time.Time) error { | |
if modTime.IsZero() { | ||
return ErrInvalidArgument("Modified since cannot be empty.") | ||
} | ||
c.conditions = append(c.conditions, copyCondition{ | ||
key: "x-amz-copy-source-if-modified-since", | ||
value: modTime.Format(http.TimeFormat), | ||
}) | ||
c.conditions["x-amz-copy-source-if-modified-since"] = modTime.Format(http.TimeFormat) | ||
return nil | ||
} | ||
|
||
// SetByteRange - set the start and end of the source object to be | ||
// copied. | ||
func (c *CopyConditions) SetByteRange(start, end int64) error { | ||
if start < 0 || end < start { | ||
return ErrInvalidArgument("Range start less than 0 or range end less than range start.") | ||
} | ||
if end-start+1 < 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this condition is useless, Here we are already sure that
so we will never have |
||
return ErrInvalidArgument("Offset must refer to a non-zero range length.") | ||
} | ||
c.byteRangeEnd = end | ||
c.byteRangeStart = start | ||
return nil | ||
} | ||
|
||
func (c *CopyConditions) getRangeSize() int64 { | ||
if c.byteRangeStart < 0 { | ||
// only happens if byte-range was not set by user | ||
return 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably it is better to set this to -1 to say unknown. Probably zero is a valid value. |
||
} | ||
return c.byteRangeEnd - c.byteRangeStart + 1 | ||
} | ||
|
||
func (c *CopyConditions) duplicate() *CopyConditions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call this |
||
r := NewCopyConditions() | ||
for k, v := range c.conditions { | ||
r.conditions[k] = v | ||
} | ||
r.byteRangeEnd, r.byteRangeStart = c.byteRangeEnd, c.byteRangeStart | ||
return &r | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray newline?