From caf3a80333c5beed2d8d52d29125de05aa6d3e89 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Tue, 25 Oct 2016 10:30:37 +0100 Subject: [PATCH] Fix race in parallel uploads in file/readat --- api-put-object-common.go | 9 ++++--- api-put-object-file.go | 35 +++++++++++++++++---------- api-put-object-multipart.go | 6 +++-- api-put-object-readat.go | 47 +++++++++++++++++++++++-------------- 4 files changed, 60 insertions(+), 37 deletions(-) diff --git a/api-put-object-common.go b/api-put-object-common.go index 2eaef2e30..cf18cd8df 100644 --- a/api-put-object-common.go +++ b/api-put-object-common.go @@ -44,18 +44,17 @@ func isReadAt(reader io.Reader) (ok bool) { } // shouldUploadPart - verify if part should be uploaded. -func shouldUploadPart(objPart objectPart, objectParts map[int]objectPart) bool { +func shouldUploadPart(objPart objectPart, uploadReq uploadPartReq) bool { // If part not found should upload the part. - uploadedPart, found := objectParts[objPart.PartNumber] - if !found { + if uploadReq.Part == nil { return true } // if size mismatches should upload the part. - if objPart.Size != uploadedPart.Size { + if objPart.Size != uploadReq.Part.Size { return true } // if md5sum mismatches should upload the part. - if objPart.ETag != uploadedPart.ETag { + if objPart.ETag != uploadReq.Part.ETag { return true } return false diff --git a/api-put-object-file.go b/api-put-object-file.go index 920322437..6b2436b5b 100644 --- a/api-put-object-file.go +++ b/api-put-object-file.go @@ -178,14 +178,19 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // Create a channel to communicate which part to upload. // Buffer this to 10000, the maximum number of parts allowed by S3. - uploadPartsCh := make(chan int, 10000) + uploadPartsCh := make(chan uploadPartReq, 10000) // Just for readability. lastPartNumber := totalPartsCount // Send each part through the partUploadCh to be uploaded. for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- p + part, ok := partsInfo[p] + if ok { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part} + } else { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} + } } close(uploadPartsCh) @@ -193,7 +198,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe for w := 1; w <= 3; w++ { go func() { // Deal with each part as it comes through the channel. - for partNumber := range uploadPartsCh { + for uploadReq := range uploadPartsCh { // Add hash algorithms that need to be calculated by computeHash() // In case of a non-v4 signature or https connection, sha256 is not needed. hashAlgos := make(map[string]hash.Hash) @@ -206,12 +211,12 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // If partNumber was not uploaded we calculate the missing // part offset and size. For all other part numbers we // calculate offset based on multiples of partSize. - readOffset := int64(partNumber-1) * partSize + readOffset := int64(uploadReq.PartNum-1) * partSize missingPartSize := partSize // As a special case if partNumber is lastPartNumber, we // calculate the offset based on the last part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { readOffset = (fileSize - lastPartSize) missingPartSize = lastPartSize } @@ -219,6 +224,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // Get a section reader on a particular offset. sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize) var prtSize int64 + var err error + prtSize, err = computeHash(hashAlgos, hashSums, sectionReader) if err != nil { uploadedPartsCh <- uploadedPartRes{ @@ -231,19 +238,20 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // Create the part to be uploaded. verifyObjPart := objectPart{ ETag: hex.EncodeToString(hashSums["md5"]), - PartNumber: partNumber, + PartNumber: uploadReq.PartNum, Size: partSize, } + // If this is the last part do not give it the full part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { verifyObjPart.Size = lastPartSize } // Verify if part should be uploaded. - if shouldUploadPart(verifyObjPart, partsInfo) { + if shouldUploadPart(verifyObjPart, uploadReq) { // Proceed to upload the part. var objPart objectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) + objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize) if err != nil { uploadedPartsCh <- uploadedPartRes{ Error: err, @@ -252,12 +260,13 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe return } // Save successfully uploaded part metadata. - partsInfo[partNumber] = objPart + uploadReq.Part = &objPart } // Return through the channel the part size. uploadedPartsCh <- uploadedPartRes{ Size: verifyObjPart.Size, - PartNum: partNumber, + PartNum: uploadReq.PartNum, + Part: uploadReq.Part, Error: nil, } } @@ -271,8 +280,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe return totalUploadedSize, uploadRes.Error } // Retrieve each uploaded part and store it to be completed. - part, ok := partsInfo[uploadRes.PartNum] - if !ok { + part := uploadRes.Part + if part == nil { return totalUploadedSize, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum)) } // Update the total uploaded size. diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index cdd3f53c2..aabeba434 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -139,12 +139,14 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i // as we read from the source. reader = newHook(tmpBuffer, progress) + part, ok := partsInfo[partNumber] + // Verify if part should be uploaded. - if shouldUploadPart(objectPart{ + if ok && shouldUploadPart(objectPart{ ETag: hex.EncodeToString(hashSums["md5"]), PartNumber: partNumber, Size: prtSize, - }, partsInfo) { + }, uploadPartReq{PartNum: partNumber, Part: &part}) { // Proceed to upload the part. var objPart objectPart objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) diff --git a/api-put-object-readat.go b/api-put-object-readat.go index 4b1d0ebe3..8ed87609b 100644 --- a/api-put-object-readat.go +++ b/api-put-object-readat.go @@ -32,17 +32,22 @@ type uploadedPartRes struct { Error error // Any error encountered while uploading the part. PartNum int // Number of the part uploaded. Size int64 // Size of the part uploaded. + Part *objectPart +} + +type uploadPartReq struct { + PartNum int // Number of the part uploaded. + Part *objectPart // Size of the part uploaded. } // shouldUploadPartReadAt - verify if part should be uploaded. -func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) bool { +func shouldUploadPartReadAt(objPart objectPart, uploadReq uploadPartReq) bool { // If part not found part should be uploaded. - uploadedPart, found := objectParts[objPart.PartNumber] - if !found { + if uploadReq.Part == nil { return true } // if size mismatches part should be uploaded. - if uploadedPart.Size != objPart.Size { + if uploadReq.Part.Size != objPart.Size { return true } return false @@ -103,7 +108,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Declare a channel that sends the next part number to be uploaded. // Buffered to 10000 because thats the maximum number of parts allowed // by S3. - uploadPartsCh := make(chan int, 10000) + uploadPartsCh := make(chan uploadPartReq, 10000) // Declare a channel that sends back the response of a part upload. // Buffered to 10000 because thats the maximum number of parts allowed @@ -112,7 +117,12 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Send each part number to the channel to be processed. for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- p + part, ok := partsInfo[p] + if ok { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part} + } else { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} + } } close(uploadPartsCh) @@ -123,19 +133,19 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read readAtBuffer := make([]byte, optimalReadBufferSize) // Each worker will draw from the part channel and upload in parallel. - for partNumber := range uploadPartsCh { + for uploadReq := range uploadPartsCh { // Declare a new tmpBuffer. tmpBuffer := new(bytes.Buffer) // If partNumber was not uploaded we calculate the missing // part offset and size. For all other part numbers we // calculate offset based on multiples of partSize. - readOffset := int64(partNumber-1) * partSize + readOffset := int64(uploadReq.PartNum-1) * partSize missingPartSize := partSize // As a special case if partNumber is lastPartNumber, we // calculate the offset based on the last part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { readOffset = (size - lastPartSize) missingPartSize = lastPartSize } @@ -153,6 +163,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read } var prtSize int64 + var err error prtSize, err = hashCopyBuffer(hashAlgos, hashSums, tmpBuffer, sectionReader, readAtBuffer) if err != nil { // Send the error back through the channel. @@ -166,21 +177,21 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Verify object if its uploaded. verifyObjPart := objectPart{ - PartNumber: partNumber, + PartNumber: uploadReq.PartNum, Size: partSize, } // Special case if we see a last part number, save last part // size as the proper part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { verifyObjPart.Size = lastPartSize } // Only upload the necessary parts. Otherwise return size through channel // to update any progress bar. - if shouldUploadPartReadAt(verifyObjPart, partsInfo) { + if shouldUploadPartReadAt(verifyObjPart, uploadReq) { // Proceed to upload the part. var objPart objectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) + objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize) if err != nil { uploadedPartsCh <- uploadedPartRes{ Size: 0, @@ -190,12 +201,13 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read return } // Save successfully uploaded part metadata. - partsInfo[partNumber] = objPart + uploadReq.Part = &objPart } // Send successful part info through the channel. uploadedPartsCh <- uploadedPartRes{ Size: verifyObjPart.Size, - PartNum: partNumber, + PartNum: uploadReq.PartNum, + Part: uploadReq.Part, Error: nil, } } @@ -210,8 +222,9 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read return totalUploadedSize, uploadRes.Error } // Retrieve each uploaded part and store it to be completed. - part, ok := partsInfo[uploadRes.PartNum] - if !ok { + // part, ok := partsInfo[uploadRes.PartNum] + part := uploadRes.Part + if part == nil { return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum)) } // Update the totalUploadedSize.