Skip to content

Commit

Permalink
Reduce memory consumption by staging the data locally.
Browse files Browse the repository at this point in the history
Fixes minio#730
  • Loading branch information
harshavardhana authored and minio-trusted committed Jun 30, 2017
1 parent 982f4fb commit ec6108f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
16 changes: 9 additions & 7 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
// Initialize a temporary buffer.
tmpBuffer, terr := newTempFile(objectName)
if terr != nil {
return 0, terr
}

// Choose hash algorithms to be calculated by hashCopyN, avoid sha256
// with non-v4 signature request or HTTPS connection
hashAlgos, hashSums := c.hashMaterials()
Expand All @@ -106,22 +109,21 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize

// Increment part number.
partNumber++

// Close the temporary buffer.
tmpBuffer.Close()

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
Expand Down
4 changes: 3 additions & 1 deletion examples/s3/putobject-progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func main() {
progress := pb.New64(objectInfo.Size)
progress.Start()

n, err := s3Client.PutObjectWithProgress("my-bucketname", "my-objectname-progress", reader, "application/octet-stream", progress)
n, err := s3Client.PutObjectWithProgress("my-bucketname", "my-objectname-progress", reader, map[string][]string{
"Content-Type": []string{"application/octet-stream"},
}, progress)
if err != nil {
log.Fatalln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/s3/putobject-s3-accelerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
}

// Enable S3 transfer accelerate endpoint.
s3Client.S3TransferAccelerate("s3-accelerate.amazonaws.com")
s3Client.SetS3TransferAccelerate("s3-accelerate.amazonaws.com")

object, err := os.Open("my-testfile")
if err != nil {
Expand Down
61 changes: 61 additions & 0 deletions tempfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage
* (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package minio

import (
"io/ioutil"
"os"
"sync"
)

// tempFile temporary file container.
type tempFile struct {
*os.File
mutex *sync.Mutex
}

// newTempFile returns a new temporary file, once closed it automatically deletes itself.
func newTempFile(prefix string) (*tempFile, error) {
// use platform specific temp directory.
file, err := ioutil.TempFile(os.TempDir(), prefix)
if err != nil {
return nil, err
}
return &tempFile{
File: file,
mutex: &sync.Mutex{},
}, nil
}

// Close closer wrapper to close and remove temporary file.
func (t *tempFile) Close() error {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.File != nil {
// Close the file.
if err := t.File.Close(); err != nil {
return err
}
// Remove file.
if err := os.Remove(t.File.Name()); err != nil {
return err
}
t.File = nil
}
return nil
}

0 comments on commit ec6108f

Please sign in to comment.