Skip to content

Commit

Permalink
api: Optimize multipart upload memory usage for unknown sized stream
Browse files Browse the repository at this point in the history
Fixes #730
  • Loading branch information
harshavardhana authored and minio-trusted committed Jul 31, 2017
1 parent 7f5b94f commit 6a63221
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 27 deletions.
2 changes: 1 addition & 1 deletion api-put-object-encrypted.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Read
metadata[amzHeaderKey] = []string{encryptMaterials.GetKey()}
metadata[amzHeaderMatDesc] = []string{encryptMaterials.GetDesc()}

return c.putObjectMultipart(bucketName, objectName, encryptMaterials, -1, metadata, progress)
return c.putObjectMultipartStreamNoLength(bucketName, objectName, encryptMaterials, -1, metadata, progress)
}
39 changes: 26 additions & 13 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
)
Expand All @@ -50,6 +51,16 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
Expand Down Expand Up @@ -88,9 +99,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

Expand All @@ -100,39 +108,44 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
bufp := bufPool.Get().(*[]byte)
cw := &cappedWriter{
buffer: *bufp,
cap: int64(cap(*bufp)),
}

// Calculates hash sums while copying partSize bytes into cw.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, cw, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader(cw.GetBytes(prtSize)), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], prtSize, metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
Expand Down
29 changes: 16 additions & 13 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
if err != nil {
return 0, err
}

return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
}

Expand Down Expand Up @@ -252,46 +253,48 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := io.CopyN(tmpBuffer, reader, partSize)
bufp := bufPool.Get().(*[]byte)
cw := &cappedWriter{
buffer: *bufp,
cap: int64(cap(*bufp)),
}

// Copies partSize bytes into tmpBuffer.
prtSize, rErr := io.CopyN(cw, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader(cw.GetBytes(prtSize)), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, prtSize, metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
Expand Down
49 changes: 49 additions & 0 deletions capped-writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package minio

import (
"errors"
)

// Used for adding entry to the object cache.
// Implements io.WriteCloser
type cappedWriter struct {
offset int64
cap int64
buffer []byte
}

// Write implements a limited writer, returns error.
// if the writes go beyond allocated size.
func (c *cappedWriter) Write(b []byte) (n int, err error) {
if c.offset+int64(len(b)) > c.cap {
return 0, errors.New("excess data")
}
n = copy(c.buffer[int(c.offset):int(c.offset)+len(b)], b)
c.offset = c.offset + int64(n)
return n, nil
}

func (c *cappedWriter) Len() int {
return len(c.buffer)
}

func (c *cappedWriter) GetBytes(offset int64) []byte {
return c.buffer[:offset]
}
63 changes: 63 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3609,6 +3609,68 @@ func testUserMetadataCopyingV2() {
testUserMetadataCopyingWrapper(c)
}

// Test put object with size -1 byte object.
func testPutObjectNoLengthV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

objectName := bucketName + "unique"

// Generate data using 4 parts so that all 3 'workers' are utilized and a part is leftover.
// Use different data for each part for multipart tests to ensure part order at the end.
var buf = getDataBuffer("datafile-65-MB", MinPartSize)

// Upload an object.
n, err := c.PutObjectWithSize(bucketName, objectName, bytes.NewReader(buf), -1, nil, nil)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(len(buf)) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", len(buf), n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4023,6 +4085,7 @@ func main() {
testEncryptedCopyObjectV2()
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down

0 comments on commit 6a63221

Please sign in to comment.