Skip to content

Commit

Permalink
Fix #730 by replacing the buffer pool with a per-call buffer.
Browse files Browse the repository at this point in the history
runtime/debug.FreeOSMemory() is used to ensure that after a write,
the buffer is immediately released so that subsequent operations
do not run out of memory.
  • Loading branch information
sb10 committed Aug 10, 2017
1 parent 1a09415 commit c1e1e54
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 30 deletions.
30 changes: 9 additions & 21 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"io/ioutil"
"net/http"
"net/url"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
)
Expand All @@ -51,16 +51,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
Expand All @@ -78,7 +68,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, _, _, err := optimalPartInfo(-1)
totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -101,38 +91,39 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
buf := make([]byte, partSize)
defer debug.FreeOSMemory()

for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
v.Write(buf[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(buf[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -145,9 +136,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
17 changes: 8 additions & 9 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"reflect"
"runtime"
"runtime/debug"
"sort"
"strings"

Expand Down Expand Up @@ -233,7 +234,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, _, _, err := optimalPartInfo(-1)
totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -256,27 +257,28 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
buf := make([]byte, partSize)
defer debug.FreeOSMemory()

for partNumber <= totalPartsCount {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(buf[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -289,9 +291,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
72 changes: 72 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3671,6 +3671,77 @@ func testPutObjectNoLengthV2() {
}
}

// Test put objects of unknown size.
func testPutObjectsUnknownV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

// Issues are revealed by trying to upload multiple files of unknown size
// sequentially (on 4GB machines)
for i := 1; i <= 4; i++ {
// Simulate that we could be receiving byte slices of data that we want
// to upload as a file
rpipe, wpipe := io.Pipe()
defer rpipe.Close()
go func() {
b := []byte("test")
wpipe.Write(b)
wpipe.Close()
}()

// Upload the object.
objectName := fmt.Sprintf("%sunique%d", bucketName, i)
n, err := c.PutObjectStreaming(bucketName, objectName, rpipe)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(4) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", 4, n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4086,6 +4157,7 @@ func main() {
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testPutObjectsUnknownV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down

0 comments on commit c1e1e54

Please sign in to comment.