diff --git a/service/s3/s3manager/download.go b/service/s3/s3manager/download.go index 002df1c30eb..99b4bf0a30a 100644 --- a/service/s3/s3manager/download.go +++ b/service/s3/s3manager/download.go @@ -32,10 +32,14 @@ type Downloader struct { // The buffer size (in bytes) to use when buffering data into chunks and // sending them as parts to S3. The minimum allowed part size is 5MB, and // if this value is set to zero, the DefaultDownloadPartSize value will be used. + // + // PartSize is ignored if the Range input parameter is provided. PartSize int64 // The number of goroutines to spin up in parallel when sending parts. // If this is set to zero, the DefaultDownloadConcurrency value will be used. + // + // Concurrency is ignored if the Range input parameter is provided. Concurrency int // An S3 client to use when performing downloads. @@ -130,6 +134,10 @@ type maxRetrier interface { // // The w io.WriterAt can be satisfied by an os.File to do multipart concurrent // downloads, or in memory []byte wrapper using aws.WriteAtBuffer. +// +// If the GetObjectInput's Range value is provided that will cause the downloader +// to perform a single GetObjectInput request for that object's range. This will +// caused the part size, and concurrency configurations to be ignored. func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) { return d.DownloadWithContext(aws.BackgroundContext(), w, input, options...) } @@ -153,6 +161,10 @@ func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options .. // downloads, or in memory []byte wrapper using aws.WriteAtBuffer. // // It is safe to call this method concurrently across goroutines. +// +// If the GetObjectInput's Range value is provided that will cause the downloader +// to perform a single GetObjectInput request for that object's range. This will +// caused the part size, and concurrency configurations to be ignored. func (d Downloader) DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) { impl := downloader{w: w, in: input, cfg: d, ctx: ctx} @@ -199,6 +211,14 @@ type downloader struct { // download performs the implementation of the object download across ranged // GETs. func (d *downloader) download() (n int64, err error) { + // If range is specified fall back to single download of that range + // this enables the functionality of ranged gets with the downloader but + // at the cost of no multipart downloads. + if rng := aws.StringValue(d.in.Range); len(rng) > 0 { + d.downloadRange(rng) + return d.written, d.err + } + // Spin off first worker to check additional header information d.getChunk() @@ -285,14 +305,32 @@ func (d *downloader) getChunk() { } } -// downloadChunk downloads the chunk froom s3 +// downloadRange downloads an Object given the passed in Byte-Range value. +// The chunk used down download the range will be configured for that range. +func (d *downloader) downloadRange(rng string) { + if d.getErr() != nil { + return + } + + chunk := dlchunk{w: d.w, start: d.pos} + // Ranges specified will short circuit the multipart download + chunk.withRange = rng + + if err := d.downloadChunk(chunk); err != nil { + d.setErr(err) + } + + // Update the position based on the amount of data received. + d.pos = d.written +} + +// downloadChunk downloads the chunk from s3 func (d *downloader) downloadChunk(chunk dlchunk) error { in := &s3.GetObjectInput{} awsutil.Copy(in, d.in) // Get the next byte range of data - rng := fmt.Sprintf("bytes=%d-%d", chunk.start, chunk.start+chunk.size-1) - in.Range = &rng + in.Range = aws.String(chunk.ByteRange()) var n int64 var err error @@ -417,12 +455,18 @@ type dlchunk struct { start int64 size int64 cur int64 + + // specifies the byte range the chunk should be downloaded with. + withRange string } // Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start // position to its end (or EOF). +// +// If a range is specified on the dlchunk the size will be ignored when writing. +// as the total size may not of bee known ahead of time. func (c *dlchunk) Write(p []byte) (n int, err error) { - if c.cur >= c.size { + if c.cur >= c.size && len(c.withRange) == 0 { return 0, io.EOF } @@ -431,3 +475,13 @@ func (c *dlchunk) Write(p []byte) (n int, err error) { return } + +// ByteRange returns a HTTP Byte-Range header value that should be used by the +// client to request the chunk's range. +func (c *dlchunk) ByteRange() string { + if len(c.withRange) != 0 { + return c.withRange + } + + return fmt.Sprintf("bytes=%d-%d", c.start, c.start+c.size-1) +} diff --git a/service/s3/s3manager/download_test.go b/service/s3/s3manager/download_test.go index f272c492096..473b5d4c8db 100644 --- a/service/s3/s3manager/download_test.go +++ b/service/s3/s3manager/download_test.go @@ -435,6 +435,28 @@ func TestDownloadWithContextCanceled(t *testing.T) { } } +func TestDownload_WithRange(t *testing.T) { + s, names, ranges := dlLoggingSvc([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + + d := s3manager.NewDownloaderWithClient(s, func(d *s3manager.Downloader) { + d.Concurrency = 10 // should be ignored + d.PartSize = 1 // should be ignored + }) + + w := &aws.WriteAtBuffer{} + n, err := d.Download(w, &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("key"), + Range: aws.String("bytes=2-6"), + }) + + assert.Nil(t, err) + assert.Equal(t, int64(5), n) + assert.Equal(t, []string{"GetObject"}, *names) + assert.Equal(t, []string{"bytes=2-6"}, *ranges) + assert.Equal(t, []byte{2, 3, 4, 5, 6}, w.Bytes()) +} + func TestDownload_WithFailure(t *testing.T) { svc := s3.New(unit.Session) svc.Handlers.Send.Clear()