Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Image delta client support with progress reporting #35

Merged
merged 5 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions api/server/router/delta/backend.go

This file was deleted.

30 changes: 0 additions & 30 deletions api/server/router/delta/delta.go

This file was deleted.

27 changes: 0 additions & 27 deletions api/server/router/delta/delta_routes.go

This file was deleted.

1 change: 1 addition & 0 deletions api/server/router/image/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type containerBackend interface {
}

type imageBackend interface {
DeltaCreate(deltaSrc, deltaDest string, outStream io.Writer) error
ImageDelete(imageRef string, force, prune bool) ([]types.ImageDeleteResponseItem, error)
ImageHistory(imageName string) ([]*image.HistoryResponseItem, error)
Images(imageFilters filters.Args, all bool, withExtraAttrs bool) ([]*types.ImageSummary, error)
Expand Down
1 change: 1 addition & 0 deletions api/server/router/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (r *imageRouter) initRoutes() {
router.NewPostRoute("/commit", r.postCommit),
router.NewPostRoute("/images/load", r.postImagesLoad),
router.NewPostRoute("/images/create", r.postImagesCreate, router.WithCancel),
router.NewPostRoute("/images/delta", r.postImagesDelta),
router.NewPostRoute("/images/{name:.*}/push", r.postImagesPush, router.WithCancel),
router.NewPostRoute("/images/{name:.*}/tag", r.postImagesTag),
router.NewPostRoute("/images/prune", r.postImagesPrune, router.WithCancel),
Expand Down
22 changes: 22 additions & 0 deletions api/server/router/image/image_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,28 @@ func (s *imageRouter) postImagesCreate(ctx context.Context, w http.ResponseWrite
return nil
}

func (d *imageRouter) postImagesDelta(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}

deltaSrc := r.Form.Get("src")
deltaDest := r.Form.Get("dest")

output := ioutils.NewWriteFlusher(w)
defer output.Close()

w.Header().Set("Content-Type", "application/json")

if err := d.backend.DeltaCreate(deltaSrc, deltaDest, output); err != nil {
if !output.Flushed() {
return err
}
output.Write(streamformatter.FormatError(err))
}
return nil
}

func (s *imageRouter) postImagesPush(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
metaHeaders := map[string][]string{}
for k, v := range r.Header {
Expand Down
2 changes: 1 addition & 1 deletion api/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5238,7 +5238,7 @@ paths:
schema:
$ref: "#/definitions/ErrorResponse"
tags: ["Image"]
/deltas/create:
/image/delta:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be /images/delta? Either that or client/image_delta.go:17 is correct, can't be both :P

post:
summary: "Create a delta"
description: "Create a binary delta between two images."
Expand Down
22 changes: 22 additions & 0 deletions client/image_delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package client

import (
"io"
"net/url"

"golang.org/x/net/context"
)

// ImageImport creates a new image based in the source options.
// It returns the JSON content in the response body.
func (cli *Client) ImageDelta(ctx context.Context, src, dest string) (io.ReadCloser, error) {
query := url.Values{}
query.Set("src", src)
query.Set("dest", dest)

resp, err := cli.postRaw(ctx, "/images/delta", query, nil, nil)
if err != nil {
return nil, err
}
return resp.body, nil
}
1 change: 1 addition & 0 deletions client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type ImageAPIClient interface {
ImageBuild(ctx context.Context, context io.Reader, options types.ImageBuildOptions) (types.ImageBuildResponse, error)
BuildCachePrune(ctx context.Context) (*types.BuildCachePruneReport, error)
ImageCreate(ctx context.Context, parentReference string, options types.ImageCreateOptions) (io.ReadCloser, error)
ImageDelta(ctx context.Context, src, dest string) (io.ReadCloser, error)
ImageHistory(ctx context.Context, image string) ([]image.HistoryResponseItem, error)
ImageImport(ctx context.Context, source types.ImageImportSource, ref string, options types.ImageImportOptions) (io.ReadCloser, error)
ImageInspectWithRaw(ctx context.Context, image string) (types.ImageInspect, []byte, error)
Expand Down
2 changes: 0 additions & 2 deletions cmd/dockerd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/docker/docker/api/server/router/container"
distributionrouter "github.com/docker/docker/api/server/router/distribution"
"github.com/docker/docker/api/server/router/image"
deltarouter "github.com/docker/docker/api/server/router/delta"
"github.com/docker/docker/api/server/router/network"
sessionrouter "github.com/docker/docker/api/server/router/session"
systemrouter "github.com/docker/docker/api/server/router/system"
Expand Down Expand Up @@ -469,7 +468,6 @@ func initRouter(opts routerOptions) {
container.NewRouter(opts.daemon, decoder),
image.NewRouter(opts.daemon, decoder),
systemrouter.NewRouter(opts.daemon, opts.buildCache),
deltarouter.NewRouter(opts.daemon),
volume.NewRouter(opts.daemon),
build.NewRouter(opts.buildBackend, opts.daemon),
sessionrouter.NewRouter(opts.sessionManager),
Expand Down
91 changes: 67 additions & 24 deletions daemon/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pkg/errors"

"github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference"
apierrors "github.com/docker/docker/api/errors"
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
Expand All @@ -25,10 +24,13 @@ import (
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/runconfig"
"github.com/opencontainers/go-digest"
units "github.com/docker/go-units"
"github.com/opencontainers/selinux/go-selinux/label"
"github.com/resin-os/librsync-go"
)
Expand Down Expand Up @@ -341,42 +343,63 @@ func (daemon *Daemon) verifyNetworkingConfig(nwConfig *networktypes.NetworkingCo

// DeltaCreate creates a delta of the specified src and dest images
// This is called directly from the Engine API
func (daemon *Daemon) DeltaCreate(deltaSrc, deltaDest string) (string, error) {
func (daemon *Daemon) DeltaCreate(deltaSrc, deltaDest string, outStream io.Writer) error {
progressOutput := streamformatter.NewJSONProgressOutput(outStream, false)

srcImg, err := daemon.GetImage(deltaSrc)
if err != nil {
return "", errors.Wrapf(err, "no such image: %s", deltaSrc)
return errors.Wrapf(err, "no such image: %s", deltaSrc)
}

dstImg, err := daemon.GetImage(deltaDest)
if err != nil {
return "", errors.Wrapf(err, "no such image: %s", deltaDest)
return errors.Wrapf(err, "no such image: %s", deltaDest)
}

is := daemon.stores[dstImg.Platform()].imageStore
ls := daemon.stores[dstImg.Platform()].layerStore

srcData, err := is.GetTarSeekStream(srcImg.ID())
if err != nil {
return "", err
return err
}
defer srcData.Close()

srcSig, err := librsync.Signature(bufio.NewReaderSize(srcData, 65536), ioutil.Discard, 512, 32, librsync.BLAKE2_SIG_MAGIC)
srcDataLen, err := ioutils.SeekerSize(srcData)
if err != nil {
return err
}

progressReader := progress.NewProgressReader(srcData, progressOutput, srcDataLen, deltaSrc, "Fingerprinting")
defer progressReader.Close()

srcSig, err := librsync.Signature(bufio.NewReaderSize(progressReader, 65536), ioutil.Discard, 512, 32, librsync.BLAKE2_SIG_MAGIC)
if err != nil {
return "", err
return err
}

progress.Update(progressOutput, deltaSrc, "Fingerprint complete")

deltaRootFS := image.NewRootFS()

for _, diffID := range dstImg.RootFS.DiffIDs {
progress.Update(progressOutput, stringid.TruncateID(diffID.String()), "Waiting")
}

statTotalSize := int64(0)
statDetlaSize := int64(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: detla


for i, diffID := range dstImg.RootFS.DiffIDs {
var (
layerData io.Reader
platform layer.Platform
)
commonLayer := false

// We're only interested in layers that are different. Push empty
// layers for common layers
if i < len(srcImg.RootFS.DiffIDs) && srcImg.RootFS.DiffIDs[i] == diffID {
commonLayer = true
layerData, _ = layer.EmptyLayer.TarStream()
platform = layer.EmptyLayer.Platform()
} else {
Expand All @@ -385,31 +408,41 @@ func (daemon *Daemon) DeltaCreate(deltaSrc, deltaDest string) (string, error) {

l, err := ls.Get(dstRootFS.ChainID())
if err != nil {
return "", err
return err
}
defer layer.ReleaseAndLog(ls, l)

platform = l.Platform()

input, err := l.TarStream()
if err != nil {
return "", err
return err
}
defer input.Close()

inputSize, err := l.DiffSize()
if err != nil {
return err
}

statTotalSize += inputSize

progressReader := progress.NewProgressReader(input, progressOutput, inputSize, stringid.TruncateID(diffID.String()), "Computing delta")
defer progressReader.Close()

pR, pW := io.Pipe()

layerData = pR

tmpDelta, err := ioutil.TempFile("", "docker-delta-")
if err != nil {
return "", err
return err
}
defer os.Remove(tmpDelta.Name())

go func() {
w := bufio.NewWriter(tmpDelta)
err := librsync.Delta(srcSig, bufio.NewReader(input), w)
err := librsync.Delta(srcSig, bufio.NewReader(progressReader), w)
if err != nil {
pW.CloseWithError(err)
return
Expand Down Expand Up @@ -456,10 +489,21 @@ func (daemon *Daemon) DeltaCreate(deltaSrc, deltaDest string) (string, error) {

newLayer, err := ls.Register(layerData, deltaRootFS.ChainID(), platform)
if err != nil {
return "", err
return err
}
defer layer.ReleaseAndLog(ls, newLayer)

if commonLayer {
progress.Update(progressOutput, stringid.TruncateID(diffID.String()), "Skipping common layer")
} else {
deltaSize, err := newLayer.DiffSize()
if err != nil {
return err
}
statDetlaSize += deltaSize
progress.Update(progressOutput, stringid.TruncateID(diffID.String()), "Delta complete")
}

deltaRootFS.Append(newLayer.DiffID())
}

Expand All @@ -478,23 +522,22 @@ func (daemon *Daemon) DeltaCreate(deltaSrc, deltaDest string) (string, error) {

rawConfig, err := json.Marshal(config)
if err != nil {
return "", err
return err
}

id, err := is.Create(rawConfig)
if err != nil {
return "", err
return err
}

ref, _ := reference.WithName("delta")

deltaTag := "delta-" + digest.FromString(srcImg.ID().String() + "-" + dstImg.ImageID()).Hex()[:8]

ref2, _ := reference.WithTag(ref, deltaTag)

if err := daemon.TagImageWithReference(id, "linux", ref2); err != nil {
return "", err
humanTotal := units.HumanSize(float64(statTotalSize))
humanDelta := units.HumanSize(float64(statDetlaSize))
deltaRatio := float64(statTotalSize) / float64(statDetlaSize)
if statTotalSize == 0 {
deltaRatio = 1
}

return id.String(), nil
outStream.Write(streamformatter.FormatStatus("", "Normal size: %s, Delta size: %s, %.2fx improvement", humanTotal, humanDelta, deltaRatio))
outStream.Write(streamformatter.FormatStatus("", "Created delta: %s", id.String()))
return nil
}
6 changes: 3 additions & 3 deletions pkg/ioutils/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func max(x, y int) int {
return x
}

func seekerSize(s io.Seeker) (int64, error) {
func SeekerSize(s io.Seeker) (int64, error) {
cur, err := s.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
Expand Down Expand Up @@ -109,12 +109,12 @@ func (self *concatReadSeekCloser) Close() error {
}

func ConcatReadSeekClosers(a, b ReadSeekCloser) (ReadSeekCloser, error) {
aSize, err := seekerSize(a)
aSize, err := SeekerSize(a)
if err != nil {
return nil, err
}

bSize, err := seekerSize(b)
bSize, err := SeekerSize(b)
if err != nil {
return nil, err
}
Expand Down