Skip to content

Commit

Permalink
Merge pull request #8 from resin-os/stream-pull
Browse files Browse the repository at this point in the history
Streaming pull
  • Loading branch information
petrosagg authored Jul 28, 2017
2 parents 8c041f2 + a86e2c8 commit 9d1e6e4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 143 deletions.
186 changes: 43 additions & 143 deletions distribution/pull_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"runtime"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
Expand All @@ -39,6 +37,8 @@ var (
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
)

const maxDownloadAttempts = 5

// ImageConfigPullError is an error pulling the image config blob
// (only applies to schema2).
type ImageConfigPullError struct {
Expand Down Expand Up @@ -139,6 +139,10 @@ type v2LayerDescriptor struct {
tmpFile *os.File
verifier digest.Verifier
src distribution.Descriptor
ctx context.Context
layerDownload io.ReadCloser
downloadAttempts uint8
downloadOffset int64
}

func (ld *v2LayerDescriptor) Key() string {
Expand All @@ -156,168 +160,68 @@ func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
return ld.V2MetadataService.GetDiffID(ld.digest)
}

func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)

var (
err error
offset int64
)

if ld.tmpFile == nil {
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else {
offset, err = ld.tmpFile.Seek(0, os.SEEK_END)
if err != nil {
logrus.Debugf("error seeking to end of download file: %v", err)
offset = 0

ld.tmpFile.Close()
if err := os.Remove(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else if offset != 0 {
logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset)
}
func (ld *v2LayerDescriptor) reset() error {
if ld.layerDownload != nil {
ld.layerDownload.Close()
ld.layerDownload = nil
}

tmpFile := ld.tmpFile

layerDownload, err := ld.open(ctx)
layer, err := ld.open(ld.ctx)
if err != nil {
logrus.Errorf("Error initiating layer download: %v", err)
return nil, 0, retryOnError(err)
return err
}

if offset != 0 {
_, err := layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
}
size, err := layerDownload.Seek(0, os.SEEK_END)
if err != nil {
// Seek failed, perhaps because there was no Content-Length
// header. This shouldn't fail the download, because we can
// still continue without a progress bar.
size = 0
} else {
if size != 0 && offset > size {
logrus.Debug("Partial download is larger than full blob. Starting over")
offset = 0
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
}

// Restore the seek offset either at the beginning of the
// stream, or just after the last byte we have from previous
// attempts.
_, err = layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
return nil, 0, err
}
if _, err := layer.Seek(ld.downloadOffset, os.SEEK_SET); err != nil {
return err
}

reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading")
defer reader.Close()
ld.layerDownload = ioutils.TeeReadCloser(ioutils.NewCancelReadCloser(ld.ctx, layer), ld.verifier)

if ld.verifier == nil {
ld.verifier = ld.digest.Verifier()
}
return nil
}

_, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier))
if err != nil {
if err == transport.ErrWrongCodeForByteRange {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
return nil, 0, retryOnError(err)
func (ld *v2LayerDescriptor) Read(p []byte) (int, error) {
if ld.downloadAttempts <= 0 {
return 0, fmt.Errorf("no request retries left")
}

progress.Update(progressOutput, ld.ID(), "Verifying Checksum")

if !ld.verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
logrus.Error(err)

// Allow a retry if this digest verification error happened
// after a resumed download.
if offset != 0 {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}

return nil, 0, err
if ld.layerDownload == nil {
if err := ld.reset(); err != nil {
ld.downloadAttempts -= 1
return 0, err
}
return nil, 0, xfer.DoNotRetry{Err: err}
}

progress.Update(progressOutput, ld.ID(), "Download complete")

logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), tmpFile.Name())

_, err = tmpFile.Seek(0, os.SEEK_SET)
if err != nil {
tmpFile.Close()
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
n, err := ld.layerDownload.Read(p)
ld.downloadOffset += int64(n)
if err == io.EOF {
if !ld.verifier.Verified() {
return n, fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
}
ld.tmpFile = nil
ld.verifier = nil
return nil, 0, xfer.DoNotRetry{Err: err}
} else if err != nil {
ld.downloadAttempts -= 1
ld.layerDownload = nil
err = nil
}

// hand off the temporary file to the download manager, so it will only
// be closed once
ld.tmpFile = nil

return ioutils.NewReadCloserWrapper(tmpFile, func() error {
tmpFile.Close()
err := os.RemoveAll(tmpFile.Name())
if err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
}
return err
}), size, nil
return n, err
}

func (ld *v2LayerDescriptor) Close() {
if ld.tmpFile != nil {
ld.tmpFile.Close()
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
}
ld.layerDownload.Close()
}

func (ld *v2LayerDescriptor) truncateDownloadFile() error {
// Need a new hash context since we will be redoing the download
ld.verifier = nil
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)

if _, err := ld.tmpFile.Seek(0, os.SEEK_SET); err != nil {
logrus.Errorf("error seeking to beginning of download file: %v", err)
return err
}
ld.ctx = ctx
ld.layerDownload = nil
ld.downloadAttempts = maxDownloadAttempts
ld.verifier = ld.digest.Verifier()

if err := ld.tmpFile.Truncate(0); err != nil {
logrus.Errorf("error truncating download file: %v", err)
return err
}
progress.Update(progressOutput, ld.ID(), "Ready to download")

return nil
return ioutils.NewReadCloserWrapper(ld, func() error { return nil }), ld.src.Size, nil
}

func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
Expand Down Expand Up @@ -914,7 +818,3 @@ func fixManifestLayers(m *schema1.Manifest) error {

return nil
}

func createDownloadFile() (*os.File, error) {
return ioutil.TempFile("", "GetImageBlob")
}
20 changes: 20 additions & 0 deletions pkg/ioutils/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,23 @@ func (p *cancelReadCloser) Close() error {
p.closeWithError(io.EOF)
return nil
}

type teeReadCloser struct {
rc io.ReadCloser
r io.Reader
}

// TeeReadCloser returns a ReadCloser that writes to w what it reads from rc.
// It utilizes io.TeeReader to copy the data read and has the same behavior when reading.
// Further, when it is closed, it ensures that rc is closed as well.
func TeeReadCloser(rc io.ReadCloser, w io.Writer) io.ReadCloser {
return &teeReadCloser{rc, io.TeeReader(rc, w)}
}

func (t *teeReadCloser) Read(p []byte) (int, error) {
return t.r.Read(p)
}

func (t *teeReadCloser) Close() error {
return t.rc.Close()
}

0 comments on commit 9d1e6e4

Please sign in to comment.