Skip to content

Commit

Permalink
service/s3/s3manager: Fix Downloader ignoring Range get parameter
Browse files Browse the repository at this point in the history
Fixes the S3 Download Manager ignoring the GetObjectInput's Range
parameter. If this parameter is provided it will force the downloader to
fallback to a single GetObject request disabling concurrency and
automatic part size gets.

This can be improved, if there is demand, for the feature to parse the
byte range value provided and bulding range gets with it.

Fix aws#1296
  • Loading branch information
jasdel committed May 31, 2017
1 parent a202fff commit f4c2197
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 4 deletions.
62 changes: 58 additions & 4 deletions service/s3/s3manager/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ type Downloader struct {
// The buffer size (in bytes) to use when buffering data into chunks and
// sending them as parts to S3. The minimum allowed part size is 5MB, and
// if this value is set to zero, the DefaultDownloadPartSize value will be used.
//
// PartSize is ignored if the Range input parameter is provided.
PartSize int64

// The number of goroutines to spin up in parallel when sending parts.
// If this is set to zero, the DefaultDownloadConcurrency value will be used.
//
// Concurrency is ignored if the Range input parameter is provided.
Concurrency int

// An S3 client to use when performing downloads.
Expand Down Expand Up @@ -130,6 +134,10 @@ type maxRetrier interface {
//
// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent
// downloads, or in memory []byte wrapper using aws.WriteAtBuffer.
//
// If the GetObjectInput's Range value is provided that will cause the downloader
// to perform a single GetObjectInput request for that object's range. This will
// caused the part size, and concurrency configurations to be ignored.
func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) {
return d.DownloadWithContext(aws.BackgroundContext(), w, input, options...)
}
Expand All @@ -153,6 +161,10 @@ func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ..
// downloads, or in memory []byte wrapper using aws.WriteAtBuffer.
//
// It is safe to call this method concurrently across goroutines.
//
// If the GetObjectInput's Range value is provided that will cause the downloader
// to perform a single GetObjectInput request for that object's range. This will
// caused the part size, and concurrency configurations to be ignored.
func (d Downloader) DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) {
impl := downloader{w: w, in: input, cfg: d, ctx: ctx}

Expand Down Expand Up @@ -199,6 +211,14 @@ type downloader struct {
// download performs the implementation of the object download across ranged
// GETs.
func (d *downloader) download() (n int64, err error) {
// If range is specified fall back to single download of that range
// this enables the functionality of ranged gets with the downloader but
// at the cost of no multipart downloads.
if rng := aws.StringValue(d.in.Range); len(rng) > 0 {
d.downloadRange(rng)
return d.written, d.err
}

// Spin off first worker to check additional header information
d.getChunk()

Expand Down Expand Up @@ -285,14 +305,32 @@ func (d *downloader) getChunk() {
}
}

// downloadChunk downloads the chunk froom s3
// downloadRange downloads an Object given the passed in Byte-Range value.
// The chunk used down download the range will be configured for that range.
func (d *downloader) downloadRange(rng string) {
if d.getErr() != nil {
return
}

chunk := dlchunk{w: d.w, start: d.pos}
// Ranges specified will short circuit the multipart download
chunk.withRange = rng

if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
}

// Update the position based on the amount of data received.
d.pos = d.written
}

// downloadChunk downloads the chunk from s3
func (d *downloader) downloadChunk(chunk dlchunk) error {
in := &s3.GetObjectInput{}
awsutil.Copy(in, d.in)

// Get the next byte range of data
rng := fmt.Sprintf("bytes=%d-%d", chunk.start, chunk.start+chunk.size-1)
in.Range = &rng
in.Range = aws.String(chunk.ByteRange())

var n int64
var err error
Expand Down Expand Up @@ -417,12 +455,18 @@ type dlchunk struct {
start int64
size int64
cur int64

// specifies the byte range the chunk should be downloaded with.
withRange string
}

// Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start
// position to its end (or EOF).
//
// If a range is specified on the dlchunk the size will be ignored when writing.
// as the total size may not of bee known ahead of time.
func (c *dlchunk) Write(p []byte) (n int, err error) {
if c.cur >= c.size {
if c.cur >= c.size && len(c.withRange) == 0 {
return 0, io.EOF
}

Expand All @@ -431,3 +475,13 @@ func (c *dlchunk) Write(p []byte) (n int, err error) {

return
}

// ByteRange returns a HTTP Byte-Range header value that should be used by the
// client to request the chunk's range.
func (c *dlchunk) ByteRange() string {
if len(c.withRange) != 0 {
return c.withRange
}

return fmt.Sprintf("bytes=%d-%d", c.start, c.start+c.size-1)
}
22 changes: 22 additions & 0 deletions service/s3/s3manager/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,28 @@ func TestDownloadWithContextCanceled(t *testing.T) {
}
}

func TestDownload_WithRange(t *testing.T) {
s, names, ranges := dlLoggingSvc([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})

d := s3manager.NewDownloaderWithClient(s, func(d *s3manager.Downloader) {
d.Concurrency = 10 // should be ignored
d.PartSize = 1 // should be ignored
})

w := &aws.WriteAtBuffer{}
n, err := d.Download(w, &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("key"),
Range: aws.String("bytes=2-6"),
})

assert.Nil(t, err)
assert.Equal(t, int64(5), n)
assert.Equal(t, []string{"GetObject"}, *names)
assert.Equal(t, []string{"bytes=2-6"}, *ranges)
assert.Equal(t, []byte{2, 3, 4, 5, 6}, w.Bytes())
}

func TestDownload_WithFailure(t *testing.T) {
svc := s3.New(unit.Session)
svc.Handlers.Send.Clear()
Expand Down

0 comments on commit f4c2197

Please sign in to comment.