diff --git a/README.md b/README.md index 1e0085a52..f4530e167 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,7 @@ The full API Reference is available here. * [`GetObject`](https://docs.minio.io/docs/golang-client-api-reference#GetObject) * [`PutObject`](https://docs.minio.io/docs/golang-client-api-reference#PutObject) +* [`PutObjectStreaming`](https://docs.minio.io/docs/golang-client-api-reference#PutObjectStreaming) * [`StatObject`](https://docs.minio.io/docs/golang-client-api-reference#StatObject) * [`CopyObject`](https://docs.minio.io/docs/golang-client-api-reference#CopyObject) * [`RemoveObject`](https://docs.minio.io/docs/golang-client-api-reference#RemoveObject) diff --git a/api-put-object-file.go b/api-put-object-file.go index 740cd55e7..09fec769d 100644 --- a/api-put-object-file.go +++ b/api-put-object-file.go @@ -173,7 +173,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe close(uploadPartsCh) // Use three 'workers' to upload parts in parallel. - for w := 1; w <= 3; w++ { + for w := 1; w <= totalWorkers; w++ { go func() { // Deal with each part as it comes through the channel. for uploadReq := range uploadPartsCh { diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index 3d75e2d17..3a299f65b 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -67,8 +67,8 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read } // putObjectMultipartStreamNoChecksum - upload a large object using -// multipart upload and streaming signature for signing payload. N B -// We don't resume an incomplete multipart upload, we overwrite +// multipart upload and streaming signature for signing payload. +// N B We don't resume an incomplete multipart upload, we overwrite // existing parts of an incomplete upload. func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (int64, error) { @@ -82,10 +82,17 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string } // Get the upload id of a previously partially uploaded object or initiate a new multipart upload - uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metadata) + uploadID, err := c.findUploadID(bucketName, objectName) if err != nil { return 0, err } + if uploadID == "" { + // Initiates a new multipart request + uploadID, err = c.newUploadID(bucketName, objectName, metadata) + if err != nil { + return 0, err + } + } // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size) @@ -96,17 +103,21 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string // Total data read and written to server. should be equal to 'size' at the end of the call. var totalUploadedSize int64 + // Initialize parts uploaded map. + partsInfo := make(map[int]ObjectPart) + // Part number always starts with '1'. var partNumber int for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { // Update progress reader appropriately to the latest offset // as we read from the source. - hookReader := newHook(reader, nil) + hookReader := newHook(reader, progress) // Proceed to upload the part. if partNumber == totalPartsCount { partSize = lastPartSize } + var objPart ObjectPart objPart, err = c.uploadPart(bucketName, objectName, uploadID, io.LimitReader(hookReader, partSize), partNumber, nil, nil, partSize) diff --git a/api-put-object-progress.go b/api-put-object-progress.go index 82af399ec..f3844127e 100644 --- a/api-put-object-progress.go +++ b/api-put-object-progress.go @@ -123,17 +123,18 @@ func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.R } // PutObjectStreaming using AWS streaming signature V4 -func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader, size int64) (n int64, err error) { - if size < 0 { - return 0, ErrInvalidArgument("Size can't be negative.") - } +func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int64, err error) { + return c.PutObjectStreamingWithProgress(bucketName, objectName, reader, nil, nil) +} - // Check for largest object size allowed. - if size > int64(maxMultipartPutObjectSize) { - return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName) - } +// PutObjectStreamingWithMetadata using AWS streaming signature V4 +func (c Client) PutObjectStreamingWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string) (n int64, err error) { + return c.PutObjectStreamingWithProgress(bucketName, objectName, reader, metadata, nil) +} - // Note: Streaming signature is not supported by GCS. +// PutObjectStreamingWithProgress using AWS streaming signature V4 +func (c Client) PutObjectStreamingWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { + // NOTE: Streaming signature is not supported by GCS. if s3utils.IsGoogleEndpoint(c.endpointURL) { return 0, ErrorResponse{ Code: "NotImplemented", @@ -142,16 +143,45 @@ func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Read BucketName: bucketName, } } + // This method should return error with signature v2 minioClient. + if c.signature.isV2() { + return 0, ErrorResponse{ + Code: "NotImplemented", + Message: "AWS streaming signature v4 is not supported with minio client initialized for AWS signature v2", + Key: objectName, + BucketName: bucketName, + } + } + + // Size of the object. + var size int64 + + // Get reader size. + size, err = getReaderSize(reader) + if err != nil { + return 0, err + } + + // Check for largest object size allowed. + if size > int64(maxMultipartPutObjectSize) { + return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName) + } + + // If size cannot be found on a stream, it is not possible + // to upload using streaming signature, fall back to multipart. + if size < 0 { + return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress) + } // Set signature type to streaming signature v4. - c.signature = ChunkedV4 + c.signature = SignatureV4Streaming if size < minPartSize && size >= 0 { - return c.putObjectNoChecksum(bucketName, objectName, reader, size, nil, nil) + return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) } - // For all sizes greater than 5MiB do multipart. - n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, nil, nil) + // For all sizes greater than 64MiB do multipart. + n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, metadata, progress) if err != nil { errResp := ToErrorResponse(err) // Verify if multipart functionality is not available, if not @@ -162,7 +192,7 @@ func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Read return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) } // Fall back to uploading as single PutObject operation. - return c.putObjectNoChecksum(bucketName, objectName, reader, size, nil, nil) + return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) } return n, err } diff --git a/api-put-object-readat.go b/api-put-object-readat.go index c2cf56e0d..ebf422638 100644 --- a/api-put-object-readat.go +++ b/api-put-object-readat.go @@ -115,7 +115,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read close(uploadPartsCh) // Receive each part number from the channel allowing three parallel uploads. - for w := 1; w <= 3; w++ { + for w := 1; w <= totalWorkers; w++ { go func() { // Read defaults to reading at 5MiB buffer. readAtBuffer := make([]byte, optimalReadBufferSize) diff --git a/api.go b/api.go index fdbbb27e6..bf36ff3ea 100644 --- a/api.go +++ b/api.go @@ -654,7 +654,7 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R if c.signature.isV2() { // Add signature version '2' authorization header. req = s3signer.SignV2(*req, c.accessKeyID, c.secretAccessKey) - } else if c.signature.isV4() || c.signature.isChunkedV4() && + } else if c.signature.isV4() || c.signature.isStreamingV4() && method != "PUT" { // Set sha256 sum for signature calculation only with signature version '4'. shaHeader := unsignedPayload @@ -669,9 +669,9 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R // Add signature version '4' authorization header. req = s3signer.SignV4(*req, c.accessKeyID, c.secretAccessKey, location) - } else if c.signature.isChunkedV4() { - req = s3signer.NewStreamingSignV4(req, c.accessKeyID, - c.secretAccessKey, c.region, metadata.contentLength, time.Now().UTC()) + } else if c.signature.isStreamingV4() { + req = s3signer.StreamingSignV4(req, c.accessKeyID, + c.secretAccessKey, location, metadata.contentLength, time.Now().UTC()) } // Return request. diff --git a/api_functional_v4_test.go b/api_functional_v4_test.go index c93ac8e2f..dacbba821 100644 --- a/api_functional_v4_test.go +++ b/api_functional_v4_test.go @@ -375,6 +375,9 @@ func TestPutObjectStreaming(t *testing.T) { t.Fatal("Error:", err) } + // Enable tracing, write to stderr. + // c.TraceOn(os.Stderr) + // Set user agent. c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0") @@ -393,7 +396,7 @@ func TestPutObjectStreaming(t *testing.T) { objectName := "test-object" for i, size := range sizes { data := bytes.Repeat([]byte("a"), int(size)) - n, err := c.PutObjectStreaming(bucketName, objectName, bytes.NewReader(data), size) + n, err := c.PutObjectStreaming(bucketName, objectName, bytes.NewReader(data)) if err != nil { t.Fatalf("Test %d Error: %v %s %s", i+1, err, bucketName, objectName) } diff --git a/constants.go b/constants.go index 888b82575..6055bfdad 100644 --- a/constants.go +++ b/constants.go @@ -45,6 +45,9 @@ const optimalReadBufferSize = 1024 * 1024 * 5 // we don't want to sign the request payload const unsignedPayload = "UNSIGNED-PAYLOAD" +// Total number of parallel workers used for multipart operation. +var totalWorkers = 3 + // Signature related constants. const ( signV4Algorithm = "AWS4-HMAC-SHA256" diff --git a/docs/API.md b/docs/API.md index ecd7aea53..02ca48d71 100644 --- a/docs/API.md +++ b/docs/API.md @@ -470,8 +470,8 @@ if err != nil { } ``` - -### PutObjectStreaming(bucketName, objectName string, reader io.Reader, size int64) (n int, err error) + +### PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int, err error) Uploads an object as multiple chunks keeping memory consumption constant. It is similar to PutObject in how objects are broken into multiple parts. Each part in turn is transferred as multiple chunks with constant memory usage. However resuming previously failed uploads from where it was left is not supported. @@ -484,8 +484,6 @@ __Parameters__ |`bucketName` | _string_ |Name of the bucket | |`objectName` | _string_ |Name of the object | |`reader` | _io.Reader_ |Any Go type that implements io.Reader | -|`size` | _int64_ |Size of the object | - __Example__ @@ -498,13 +496,7 @@ if err != nil { } defer file.Close() -st, err := os.Stat() -if err != nil { - fmt.Println(err) - return -} - -n, err := minioClient.PutObjectStreaming("mybucket", "myobject", file, st.Size()) +n, err := minioClient.PutObjectStreaming("mybucket", "myobject", file) if err != nil { fmt.Println(err) return diff --git a/examples/s3/putobject-streaming.go b/examples/s3/putobject-streaming.go index 489d5bde5..d10407dbd 100644 --- a/examples/s3/putobject-streaming.go +++ b/examples/s3/putobject-streaming.go @@ -45,7 +45,7 @@ func main() { } defer object.Close() - n, err := s3Client.PutObjectStreaming("my-bucketname", "my-objectname", object, size) + n, err := s3Client.PutObjectStreaming("my-bucketname", "my-objectname", object) if err != nil { log.Fatalln(err) } diff --git a/pkg/s3signer/request-signature-streaming.go b/pkg/s3signer/request-signature-streaming.go index 45da4408f..755fd1ac5 100644 --- a/pkg/s3signer/request-signature-streaming.go +++ b/pkg/s3signer/request-signature-streaming.go @@ -99,15 +99,8 @@ func prepareStreamingRequest(req *http.Request, dataLen int64, timestamp time.Ti req.Header.Set("X-Amz-Date", timestamp.Format(iso8601DateFormat)) // Set content length with streaming signature for each chunk included. - streamContentLen := getStreamLength(dataLen, int64(payloadChunkSize)) + req.ContentLength = getStreamLength(dataLen, int64(payloadChunkSize)) req.Header.Set("x-amz-decoded-content-length", strconv.FormatInt(dataLen, 10)) - if streamContentLen > 0 { - req.Header.Set("Content-Length", strconv.FormatInt(streamContentLen, 10)) - req.ContentLength = streamContentLen - } else { - req.Header.Set("Content-Length", "-1") - req.ContentLength = -1 - } } // buildChunkHeader - returns the chunk header. @@ -200,9 +193,9 @@ func (s *StreamingReader) setStreamingAuthHeader(req *http.Request) { req.Header.Set("Authorization", auth) } -// NewStreamingSignV4 - provides chunked upload signatureV4 support by +// StreamingSignV4 - provides chunked upload signatureV4 support by // implementing io.Reader. -func NewStreamingSignV4(req *http.Request, accessKeyID, secretAccessKey, +func StreamingSignV4(req *http.Request, accessKeyID, secretAccessKey, region string, dataLen int64, reqTime time.Time) *http.Request { // Set headers needed for streaming signature. diff --git a/pkg/s3signer/request-signature-streaming_test.go b/pkg/s3signer/request-signature-streaming_test.go index cd83e652e..084a0dbab 100644 --- a/pkg/s3signer/request-signature-streaming_test.go +++ b/pkg/s3signer/request-signature-streaming_test.go @@ -19,7 +19,6 @@ package s3signer import ( "bytes" "io/ioutil" - "strconv" "testing" "time" ) @@ -40,10 +39,10 @@ func TestGetSeedSignature(t *testing.T) { t.Fatalf("Failed to parse time - %v", err) } - req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, "us-east-1", int64(dataLen), reqTime) + req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, "us-east-1", int64(dataLen), reqTime) actualSeedSignature := req.Body.(*StreamingReader).seedSignature - expectedSeedSignature := "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9" + expectedSeedSignature := "007480502de61457e955731b0f5d191f7e6f54a8a0f6cc7974a5ebd887965686" if actualSeedSignature != expectedSeedSignature { t.Errorf("Expected %s but received %s", expectedSeedSignature, actualSeedSignature) } @@ -73,9 +72,9 @@ func TestSetStreamingAuthorization(t *testing.T) { dataLen := int64(65 * 1024) reqTime, _ := time.Parse(iso8601DateFormat, "20130524T000000Z") - req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime) + req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime) - expectedAuthorization := "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20130524/us-east-1/s3/aws4_request,SignedHeaders=content-encoding;content-length;host;x-amz-content-sha256;x-amz-date;x-amz-decoded-content-length;x-amz-storage-class,Signature=4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9" + expectedAuthorization := "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20130524/us-east-1/s3/aws4_request,SignedHeaders=content-encoding;host;x-amz-content-sha256;x-amz-date;x-amz-decoded-content-length;x-amz-storage-class,Signature=007480502de61457e955731b0f5d191f7e6f54a8a0f6cc7974a5ebd887965686" actualAuthorization := req.Header.Get("Authorization") if actualAuthorization != expectedAuthorization { @@ -92,12 +91,12 @@ func TestStreamingReader(t *testing.T) { req := NewRequest("PUT", "/examplebucket/chunkObject.txt", nil) req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY") - req.Header.Set("Content-Length", strconv.FormatInt(65*1024, 10)) + req.ContentLength = 65 * 1024 req.URL.Host = "s3.amazonaws.com" baseReader := ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte("a"), 65*1024))) req.Body = baseReader - req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime) + req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime) b, err := ioutil.ReadAll(req.Body) if err != nil { diff --git a/signature-type.go b/signature-type.go index 04edad452..36e999a26 100644 --- a/signature-type.go +++ b/signature-type.go @@ -24,7 +24,7 @@ const ( Latest SignatureType = iota SignatureV4 SignatureV2 - ChunkedV4 + SignatureV4Streaming ) // isV2 - is signature SignatureV2? @@ -37,6 +37,7 @@ func (s SignatureType) isV4() bool { return s == SignatureV4 || s == Latest } -func (s SignatureType) isChunkedV4() bool { - return s == ChunkedV4 +// isStreamingV4 - is signature SignatureV4Streaming? +func (s SignatureType) isStreamingV4() bool { + return s == SignatureV4Streaming }