Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming pull #8

Merged
merged 3 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 43 additions & 143 deletions distribution/pull_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"runtime"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/client/auth"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
Expand All @@ -39,6 +37,8 @@ var (
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
)

const maxDownloadAttempts = 5

// ImageConfigPullError is an error pulling the image config blob
// (only applies to schema2).
type ImageConfigPullError struct {
Expand Down Expand Up @@ -139,6 +139,10 @@ type v2LayerDescriptor struct {
tmpFile *os.File
verifier digest.Verifier
src distribution.Descriptor
ctx context.Context
layerDownload io.ReadCloser
downloadAttempts uint8
downloadOffset int64
}

func (ld *v2LayerDescriptor) Key() string {
Expand All @@ -156,168 +160,68 @@ func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
return ld.V2MetadataService.GetDiffID(ld.digest)
}

func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)

var (
err error
offset int64
)

if ld.tmpFile == nil {
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else {
offset, err = ld.tmpFile.Seek(0, os.SEEK_END)
if err != nil {
logrus.Debugf("error seeking to end of download file: %v", err)
offset = 0

ld.tmpFile.Close()
if err := os.Remove(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else if offset != 0 {
logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset)
}
func (ld *v2LayerDescriptor) reset() error {
if ld.layerDownload != nil {
ld.layerDownload.Close()
ld.layerDownload = nil
}

tmpFile := ld.tmpFile

layerDownload, err := ld.open(ctx)
layer, err := ld.open(ld.ctx)
if err != nil {
logrus.Errorf("Error initiating layer download: %v", err)
return nil, 0, retryOnError(err)
return err
}

if offset != 0 {
_, err := layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
}
size, err := layerDownload.Seek(0, os.SEEK_END)
if err != nil {
// Seek failed, perhaps because there was no Content-Length
// header. This shouldn't fail the download, because we can
// still continue without a progress bar.
size = 0
} else {
if size != 0 && offset > size {
logrus.Debug("Partial download is larger than full blob. Starting over")
offset = 0
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
}

// Restore the seek offset either at the beginning of the
// stream, or just after the last byte we have from previous
// attempts.
_, err = layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
return nil, 0, err
}
if _, err := layer.Seek(ld.downloadOffset, os.SEEK_SET); err != nil {
return err
}

reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading")
defer reader.Close()
ld.layerDownload = ioutils.TeeReadCloser(ioutils.NewCancelReadCloser(ld.ctx, layer), ld.verifier)

if ld.verifier == nil {
ld.verifier = ld.digest.Verifier()
}
return nil
}

_, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier))
if err != nil {
if err == transport.ErrWrongCodeForByteRange {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
return nil, 0, retryOnError(err)
func (ld *v2LayerDescriptor) Read(p []byte) (int, error) {
if ld.downloadAttempts <= 0 {
return 0, fmt.Errorf("no request retries left")
}

progress.Update(progressOutput, ld.ID(), "Verifying Checksum")

if !ld.verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
logrus.Error(err)

// Allow a retry if this digest verification error happened
// after a resumed download.
if offset != 0 {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}

return nil, 0, err
if ld.layerDownload == nil {
if err := ld.reset(); err != nil {
ld.downloadAttempts -= 1
return 0, err
}
return nil, 0, xfer.DoNotRetry{Err: err}
}

progress.Update(progressOutput, ld.ID(), "Download complete")

logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), tmpFile.Name())

_, err = tmpFile.Seek(0, os.SEEK_SET)
if err != nil {
tmpFile.Close()
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
n, err := ld.layerDownload.Read(p)
ld.downloadOffset += int64(n)
if err == io.EOF {
if !ld.verifier.Verified() {
return n, fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
}
ld.tmpFile = nil
ld.verifier = nil
return nil, 0, xfer.DoNotRetry{Err: err}
} else if err != nil {
ld.downloadAttempts -= 1
ld.layerDownload = nil
err = nil
}

// hand off the temporary file to the download manager, so it will only
// be closed once
ld.tmpFile = nil

return ioutils.NewReadCloserWrapper(tmpFile, func() error {
tmpFile.Close()
err := os.RemoveAll(tmpFile.Name())
if err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
}
return err
}), size, nil
return n, err
}

func (ld *v2LayerDescriptor) Close() {
if ld.tmpFile != nil {
ld.tmpFile.Close()
if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
}
ld.layerDownload.Close()
}

func (ld *v2LayerDescriptor) truncateDownloadFile() error {
// Need a new hash context since we will be redoing the download
ld.verifier = nil
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest)

if _, err := ld.tmpFile.Seek(0, os.SEEK_SET); err != nil {
logrus.Errorf("error seeking to beginning of download file: %v", err)
return err
}
ld.ctx = ctx
ld.layerDownload = nil
ld.downloadAttempts = maxDownloadAttempts
ld.verifier = ld.digest.Verifier()

if err := ld.tmpFile.Truncate(0); err != nil {
logrus.Errorf("error truncating download file: %v", err)
return err
}
progress.Update(progressOutput, ld.ID(), "Ready to download")

return nil
return ioutils.NewReadCloserWrapper(ld, func() error { return nil }), ld.src.Size, nil
}

func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
Expand Down Expand Up @@ -914,7 +818,3 @@ func fixManifestLayers(m *schema1.Manifest) error {

return nil
}

func createDownloadFile() (*os.File, error) {
return ioutil.TempFile("", "GetImageBlob")
}
20 changes: 20 additions & 0 deletions pkg/ioutils/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,23 @@ func (p *cancelReadCloser) Close() error {
p.closeWithError(io.EOF)
return nil
}

type teeReadCloser struct {
rc io.ReadCloser
r io.Reader
}

// TeeReadCloser returns a ReadCloser that writes to w what it reads from rc.
// It utilizes io.TeeReader to copy the data read and has the same behavior when reading.
// Further, when it is closed, it ensures that rc is closed as well.
func TeeReadCloser(rc io.ReadCloser, w io.Writer) io.ReadCloser {
return &teeReadCloser{rc, io.TeeReader(rc, w)}
}

func (t *teeReadCloser) Read(p []byte) (int, error) {
return t.r.Read(p)
}

func (t *teeReadCloser) Close() error {
return t.rc.Close()
}