Skip to content

Commit

Permalink
Add estargz compression type
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Jul 30, 2021
1 parent a83721a commit 284ef97
Show file tree
Hide file tree
Showing 167 changed files with 4,733 additions and 13,335 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip]`: choose compression type for layers newly created and cached, gzip is default value
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
Expand Down
43 changes: 32 additions & 11 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cache

import (
"context"
"io"

ctdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -53,7 +55,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
})
}
eg.Go(func() error {
dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (interface{}, error) {
dp, err := g.Do(ctx, sr.ID(), func(ctx context.Context) (_ interface{}, retErr error) {
refInfo := sr.Info()
if refInfo.Blob != "" {
if forceCompression {
Expand All @@ -70,19 +72,36 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.WithStack(ErrNoBlobs)
}

var descr ocispec.Descriptor
var err error

var mediaType string
var compressor func(dest io.Writer, mediaType string) (io.WriteCloser, error)
switch compressionType {
case compression.Uncompressed:
mediaType = ocispec.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispec.MediaTypeImageLayerGzip
compressor = func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if mediaType != requiredMediaType {
return nil, errors.Errorf("unsupported media type for gzip compressor %q", requiredMediaType)
}
return ctdcompression.CompressStream(dest, ctdcompression.Gzip)
}
case compression.EStargz:
w, mt, saveLabels := writeEStargz()
compressor, mediaType = w, mt
defer func() {
if retErr == nil {
if err := saveLabels(ctx, sr.cm.ContentStore, descr.Digest); err != nil {
retErr = err
}
}
}()
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}

var descr ocispec.Descriptor
var err error

if descr.Digest == "" {
// reference needs to be committed
var lower []mount.Mount
Expand Down Expand Up @@ -112,8 +131,9 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
defer release()
}
descr, err = sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithMediaType(mediaType),
diff.WithCompressor(compressor),
)
if err != nil {
return nil, err
Expand All @@ -137,12 +157,6 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return nil, err
}
}

return descr, nil

})
Expand All @@ -163,6 +177,13 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if err := sr.setBlob(baseCtx, currentDescr); err != nil {
return err
}
if forceCompression {
// Ensure the layer is compresed as expected. If needed, forcefully create
// the compression variant here.
if err := ensureCompression(ctx, sr, currentDescr, compressionType, s); err != nil {
return err
}
}
}
return nil
}
Expand Down
188 changes: 185 additions & 3 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,62 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/labels"
"github.com/containerd/stargz-snapshotter/estargz"
estargzconv "github.com/containerd/stargz-snapshotter/nativeconverter/estargz"
"github.com/moby/buildkit/util/compression"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

type descConvertFunc func(desc ocispec.Descriptor, info content.Info) *ocispec.Descriptor

// getConverters returns converter functions according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverters(desc ocispec.Descriptor, compressionType compression.Type) (converter.ConvertFunc, func(string) string, error) {
func getConverters(desc ocispec.Descriptor, compressionType compression.Type) (converter.ConvertFunc, descConvertFunc, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(desc.MediaType) || uncompress.IsUncompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
}
return uncompress.LayerConvertFunc, convertMediaTypeToUncompress, nil
return uncompress.LayerConvertFunc, mediatypeConvertFunc(convertMediaTypeToUncompress), nil
case compression.Gzip:
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
}
return gzipLayerConvertFunc, convertMediaTypeToGzip, nil
return gzipLayerConvertFunc, mediatypeConvertFunc(convertMediaTypeToGzip), nil
case compression.EStargz:
if !images.IsLayerType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
}
return eStargzLayerConvertFunc, eStargzDescConvertFunc, nil
default:
return nil, nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
}

func mediatypeConvertFunc(f func(string) string) descConvertFunc {
return func(desc ocispec.Descriptor, info content.Info) *ocispec.Descriptor {
newDesc := desc
newDesc.MediaType = f(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc
}
}

func gzipLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispec.Descriptor) (*ocispec.Descriptor, error) {
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
Expand Down Expand Up @@ -136,3 +159,162 @@ func convertMediaTypeToGzip(mt string) string {
}
return mt
}

func eStargzLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispec.Descriptor) (*ocispec.Descriptor, error) {
newDesc, err := estargzconv.LayerConvertFunc()(ctx, cs, desc)
if err != nil {
return nil, err
}
return newDesc, saveEStargzAnnotations(ctx, cs, newDesc.Digest, newDesc.Annotations)
}

func eStargzDescConvertFunc(desc ocispec.Descriptor, info content.Info) *ocispec.Descriptor {
newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
newDesc.Annotations = mergeEStargzAnnotations(eStargzAnnotationsFromLabels(info.Labels), newDesc.Annotations)
return &newDesc
}

// loadEStargzAnnotations loads eStargz annotations from the content store.
func loadEStargzAnnotations(ctx context.Context, cs content.Store, dgst digest.Digest) (map[string]string, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return nil, err
}
return eStargzAnnotationsFromLabels(info.Labels), nil
}

// saveEStargzAnnotaitons saves eStargz annotations to the content store
// as labels of the corresponding blob.
func saveEStargzAnnotations(ctx context.Context, cs content.Store, dgst digest.Digest, annotations map[string]string) error {
saveAnnotations := mergeEStargzAnnotations(annotations, nil)
if len(saveAnnotations) == 0 {
return nil
}
info, err := cs.Info(ctx, dgst)
if err != nil {
return err
}
var fields []string
info.Labels, fields = eStargzAnnotationsToLabels(saveAnnotations)
_, err = cs.Update(ctx, info, fields...)
return err
}

// writeEStargz writes the passed blobs stream as an eStargz-compressed blob.
// saveLabels function saves all necessary eStargz annotations to the content store.
func writeEStargz() (convert func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error), mediaType string, saveLabels func(ctx context.Context, cs content.Store, dgst digest.Digest) error) {
mediaType = ocispec.MediaTypeImageLayerGzip
annotations := make(map[string]string)
var mu sync.Mutex
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if mediaType != requiredMediaType {
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
}
done := make(chan struct{})
c := new(counter)
pr, pw := io.Pipe()
go func() {
defer close(done)
defer pr.Close()
w := estargz.NewWriter(dest)
if err := w.AppendTar(io.TeeReader(pr, c)); err != nil {
pr.CloseWithError(err)
return
}
tocDgst, err := w.Close()
if err != nil {
pr.CloseWithError(err)
return
}
mu.Lock()
annotations[estargz.TOCJSONDigestAnnotation] = tocDgst.String()
annotations[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", c.size())
mu.Unlock()
}()
return &writeCloser{pw, func() error {
<-done // wait until the write completes
return nil
}}, nil
}, mediaType, func(ctx context.Context, cs content.Store, dgst digest.Digest) error {
mu.Lock()
defer mu.Unlock()
return saveEStargzAnnotations(ctx, cs, dgst, annotations)
}
}

const eStargzAnnotationsLabelPrefix = "buildkit.io/compression/estargz/annotation."

func eStargzAnnotationsFromLabels(labels map[string]string) (annotations map[string]string) {
for k, v := range labels {
if strings.HasPrefix(k, eStargzAnnotationsLabelPrefix) {
if annotations == nil {
annotations = make(map[string]string)
}
annotations[strings.TrimPrefix(k, eStargzAnnotationsLabelPrefix)] = v
}
}
return annotations
}

func eStargzAnnotationsToLabels(annotations map[string]string) (labels map[string]string, fields []string) {
for k, v := range annotations {
if labels == nil {
labels = make(map[string]string)
}
k2 := eStargzAnnotationsLabelPrefix + k
labels[k2] = v
fields = append(fields, "labels."+k2)
}
return labels, fields
}

func mergeEStargzAnnotations(src, dst map[string]string) map[string]string {
if src == nil {
return dst
}
for _, k := range []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} {
if v, ok := src[k]; ok {
if dst == nil {
dst = make(map[string]string)
}
dst[k] = v
}
}
return dst
}

type writeCloser struct {
io.WriteCloser
closeFunc func() error
}

func (wc *writeCloser) Close() error {
err1 := wc.WriteCloser.Close()
err2 := wc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}

type counter struct {
n int64
mu sync.Mutex
}

func (c *counter) Write(p []byte) (n int, err error) {
c.mu.Lock()
c.n += int64(len(p))
c.mu.Unlock()
return len(p), nil
}

func (c *counter) size() (n int64) {
c.mu.Lock()
n = c.n
c.mu.Unlock()
return
}
Loading

0 comments on commit 284ef97

Please sign in to comment.