Skip to content

Commit

Permalink
Make some changes to PutObjectStreaming() API
Browse files Browse the repository at this point in the history
- Detect size automatically like other PutObject() operations.
- Allow progress bar to be passed into PutObjectStreaming().
- Allow also metadata to be passed into PutObjectStreaming().
- Rename NewStreamingV4 to just StreamingV4(). Keeping it
  consistent with other signature methods.
  • Loading branch information
harshavardhana authored and minio-trusted committed Apr 23, 2017
1 parent de5a907 commit 16dda0e
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 53 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ The full API Reference is available here.
* [`GetObject`](https://docs.minio.io/docs/golang-client-api-reference#GetObject)
* [`PutObject`](https://docs.minio.io/docs/golang-client-api-reference#PutObject)
* [`PutObjectStreaming`](https://docs.minio.io/docs/golang-client-api-reference#PutObjectStreaming)
* [`StatObject`](https://docs.minio.io/docs/golang-client-api-reference#StatObject)
* [`CopyObject`](https://docs.minio.io/docs/golang-client-api-reference#CopyObject)
* [`RemoveObject`](https://docs.minio.io/docs/golang-client-api-reference#RemoveObject)
Expand Down
15 changes: 13 additions & 2 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
}

// Get the upload id of a previously partially uploaded object or initiate a new multipart upload
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metadata)
uploadID, err := c.findUploadID(bucketName, objectName)
if err != nil {
return 0, err
}
if uploadID == "" {
// Initiates a new multipart request
uploadID, err = c.newUploadID(bucketName, objectName, metadata)
if err != nil {
return 0, err
}
}

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
Expand All @@ -96,17 +103,21 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Part number always starts with '1'.
var partNumber int
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
// Update progress reader appropriately to the latest offset
// as we read from the source.
hookReader := newHook(reader, nil)
hookReader := newHook(reader, progress)

// Proceed to upload the part.
if partNumber == totalPartsCount {
partSize = lastPartSize
}

var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID,
io.LimitReader(hookReader, partSize), partNumber, nil, nil, partSize)
Expand Down
57 changes: 43 additions & 14 deletions api-put-object-progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,18 @@ func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.R
}

// PutObjectStreaming using AWS streaming signature V4
func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader, size int64) (n int64, err error) {
if size < 0 {
return 0, ErrInvalidArgument("Size can't be negative.")
}
func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int64, err error) {
return c.PutObjectStreamingWithProgress(bucketName, objectName, reader, nil, nil)
}

// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
// PutObjectStreamingWithMetadata using AWS streaming signature V4
func (c Client) PutObjectStreamingWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string) (n int64, err error) {
return c.PutObjectStreamingWithProgress(bucketName, objectName, reader, metadata, nil)
}

// Note: Streaming signature is not supported by GCS.
// PutObjectStreamingWithProgress using AWS streaming signature V4
func (c Client) PutObjectStreamingWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// NOTE: Streaming signature is not supported by GCS.
if s3utils.IsGoogleEndpoint(c.endpointURL) {
return 0, ErrorResponse{
Code: "NotImplemented",
Expand All @@ -142,16 +143,44 @@ func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Read
BucketName: bucketName,
}
}
// This method should return error with signature v2 minioClient.
if c.signature.isV2() {
return 0, ErrorResponse{
Code: "NotImplemented",
Message: "AWS streaming signature v4 is not supported with minio client initialized for AWS signature v2",
Key: objectName,
BucketName: bucketName,
}
}

// Size of the object.
var size int64

// Get reader size.
size, err = getReaderSize(reader)
if err != nil {
return 0, err
}

// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}

// Negative size treat it as maximum allowed object size.
if size < 0 {
size = maxMultipartPutObjectSize
}

// Set signature type to streaming signature v4.
c.signature = ChunkedV4
c.signature = SignatureV4Streaming

if size < minPartSize && size >= 0 {
return c.putObjectNoChecksum(bucketName, objectName, reader, size, nil, nil)
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
}

// For all sizes greater than 5MiB do multipart.
n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, nil, nil)
// For all sizes greater than 64MiB do multipart.
n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, metadata, progress)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
Expand All @@ -162,7 +191,7 @@ func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Read
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, nil, nil)
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress)
}
return n, err
}
Expand Down
8 changes: 4 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
if c.signature.isV2() {
// Add signature version '2' authorization header.
req = s3signer.SignV2(*req, c.accessKeyID, c.secretAccessKey)
} else if c.signature.isV4() || c.signature.isChunkedV4() &&
} else if c.signature.isV4() || c.signature.isStreamingV4() &&
method != "PUT" {
// Set sha256 sum for signature calculation only with signature version '4'.
shaHeader := unsignedPayload
Expand All @@ -669,9 +669,9 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R

// Add signature version '4' authorization header.
req = s3signer.SignV4(*req, c.accessKeyID, c.secretAccessKey, location)
} else if c.signature.isChunkedV4() {
req = s3signer.NewStreamingSignV4(req, c.accessKeyID,
c.secretAccessKey, c.region, metadata.contentLength, time.Now().UTC())
} else if c.signature.isStreamingV4() {
req = s3signer.StreamingSignV4(req, c.accessKeyID,
c.secretAccessKey, location, metadata.contentLength, time.Now().UTC())
}

// Return request.
Expand Down
5 changes: 4 additions & 1 deletion api_functional_v4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func TestPutObjectStreaming(t *testing.T) {
t.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

Expand All @@ -393,7 +396,7 @@ func TestPutObjectStreaming(t *testing.T) {
objectName := "test-object"
for i, size := range sizes {
data := bytes.Repeat([]byte("a"), int(size))
n, err := c.PutObjectStreaming(bucketName, objectName, bytes.NewReader(data), size)
n, err := c.PutObjectStreaming(bucketName, objectName, bytes.NewReader(data))
if err != nil {
t.Fatalf("Test %d Error: %v %s %s", i+1, err, bucketName, objectName)
}
Expand Down
14 changes: 3 additions & 11 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ if err != nil {
}
```

<a name="PutObject"></a>
### PutObjectStreaming(bucketName, objectName string, reader io.Reader, size int64) (n int, err error)
<a name="PutObjectStreaming"></a>
### PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int, err error)

Uploads an object as multiple chunks keeping memory consumption constant. It is similar to PutObject in how objects are broken into multiple parts. Each part in turn is transferred as multiple chunks with constant memory usage. However resuming previously failed uploads from where it was left is not supported.

Expand All @@ -484,8 +484,6 @@ __Parameters__
|`bucketName` | _string_ |Name of the bucket |
|`objectName` | _string_ |Name of the object |
|`reader` | _io.Reader_ |Any Go type that implements io.Reader |
|`size` | _int64_ |Size of the object |


__Example__

Expand All @@ -498,13 +496,7 @@ if err != nil {
}
defer file.Close()

st, err := os.Stat()
if err != nil {
fmt.Println(err)
return
}

n, err := minioClient.PutObjectStreaming("mybucket", "myobject", file, st.Size())
n, err := minioClient.PutObjectStreaming("mybucket", "myobject", file)
if err != nil {
fmt.Println(err)
return
Expand Down
2 changes: 1 addition & 1 deletion examples/s3/putobject-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
}
defer object.Close()

n, err := s3Client.PutObjectStreaming("my-bucketname", "my-objectname", object, size)
n, err := s3Client.PutObjectStreaming("my-bucketname", "my-objectname", object)
if err != nil {
log.Fatalln(err)
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/s3signer/request-signature-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,8 @@ func prepareStreamingRequest(req *http.Request, dataLen int64, timestamp time.Ti
req.Header.Set("X-Amz-Date", timestamp.Format(iso8601DateFormat))

// Set content length with streaming signature for each chunk included.
streamContentLen := getStreamLength(dataLen, int64(payloadChunkSize))
req.ContentLength = getStreamLength(dataLen, int64(payloadChunkSize))
req.Header.Set("x-amz-decoded-content-length", strconv.FormatInt(dataLen, 10))
if streamContentLen > 0 {
req.Header.Set("Content-Length", strconv.FormatInt(streamContentLen, 10))
req.ContentLength = streamContentLen
} else {
req.Header.Set("Content-Length", "-1")
req.ContentLength = -1
}
}

// buildChunkHeader - returns the chunk header.
Expand Down Expand Up @@ -200,9 +193,9 @@ func (s *StreamingReader) setStreamingAuthHeader(req *http.Request) {
req.Header.Set("Authorization", auth)
}

// NewStreamingSignV4 - provides chunked upload signatureV4 support by
// StreamingSignV4 - provides chunked upload signatureV4 support by
// implementing io.Reader.
func NewStreamingSignV4(req *http.Request, accessKeyID, secretAccessKey,
func StreamingSignV4(req *http.Request, accessKeyID, secretAccessKey,
region string, dataLen int64, reqTime time.Time) *http.Request {

// Set headers needed for streaming signature.
Expand Down
13 changes: 6 additions & 7 deletions pkg/s3signer/request-signature-streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package s3signer
import (
"bytes"
"io/ioutil"
"strconv"
"testing"
"time"
)
Expand All @@ -40,10 +39,10 @@ func TestGetSeedSignature(t *testing.T) {
t.Fatalf("Failed to parse time - %v", err)
}

req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, "us-east-1", int64(dataLen), reqTime)
req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, "us-east-1", int64(dataLen), reqTime)
actualSeedSignature := req.Body.(*StreamingReader).seedSignature

expectedSeedSignature := "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9"
expectedSeedSignature := "007480502de61457e955731b0f5d191f7e6f54a8a0f6cc7974a5ebd887965686"
if actualSeedSignature != expectedSeedSignature {
t.Errorf("Expected %s but received %s", expectedSeedSignature, actualSeedSignature)
}
Expand Down Expand Up @@ -73,9 +72,9 @@ func TestSetStreamingAuthorization(t *testing.T) {

dataLen := int64(65 * 1024)
reqTime, _ := time.Parse(iso8601DateFormat, "20130524T000000Z")
req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime)
req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime)

expectedAuthorization := "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20130524/us-east-1/s3/aws4_request,SignedHeaders=content-encoding;content-length;host;x-amz-content-sha256;x-amz-date;x-amz-decoded-content-length;x-amz-storage-class,Signature=4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9"
expectedAuthorization := "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20130524/us-east-1/s3/aws4_request,SignedHeaders=content-encoding;host;x-amz-content-sha256;x-amz-date;x-amz-decoded-content-length;x-amz-storage-class,Signature=007480502de61457e955731b0f5d191f7e6f54a8a0f6cc7974a5ebd887965686"

actualAuthorization := req.Header.Get("Authorization")
if actualAuthorization != expectedAuthorization {
Expand All @@ -92,12 +91,12 @@ func TestStreamingReader(t *testing.T) {

req := NewRequest("PUT", "/examplebucket/chunkObject.txt", nil)
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
req.Header.Set("Content-Length", strconv.FormatInt(65*1024, 10))
req.ContentLength = 65 * 1024
req.URL.Host = "s3.amazonaws.com"

baseReader := ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte("a"), 65*1024)))
req.Body = baseReader
req = NewStreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime)
req = StreamingSignV4(req, accessKeyID, secretAccessKeyID, location, dataLen, reqTime)

b, err := ioutil.ReadAll(req.Body)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions signature-type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
Latest SignatureType = iota
SignatureV4
SignatureV2
ChunkedV4
SignatureV4Streaming
)

// isV2 - is signature SignatureV2?
Expand All @@ -37,6 +37,7 @@ func (s SignatureType) isV4() bool {
return s == SignatureV4 || s == Latest
}

func (s SignatureType) isChunkedV4() bool {
return s == ChunkedV4
// isStreamingV4 - is signature SignatureV4Streaming?
func (s SignatureType) isStreamingV4() bool {
return s == SignatureV4Streaming
}

0 comments on commit 16dda0e

Please sign in to comment.