Skip to content

Commit

Permalink
Add estargz compression type
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Aug 24, 2021
1 parent f314c4b commit f8d30d5
Show file tree
Hide file tree
Showing 100 changed files with 2,029 additions and 1,344 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip]`: choose compression type for layers newly created and cached, gzip is default value
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
Expand Down
62 changes: 43 additions & 19 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package cache
import (
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -44,6 +46,8 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
return computeBlobChain(ctx, sr, createIfNeeded, compressionType, forceCompression, s)
}

type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error)

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) error {
eg, ctx := errgroup.WithContext(ctx)
if sr.parent != nil {
Expand All @@ -53,20 +57,25 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}

eg.Go(func() error {
v, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) {
if getBlob(sr.md) != "" {
return sr.ociDesc()
return nil, nil
}
if !createIfNeeded {
return nil, errors.WithStack(ErrNoBlobs)
}

var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch compressionType {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = writeEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
Expand Down Expand Up @@ -100,6 +109,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
desc, err := sr.cm.Differ.Compare(ctx, lower, upper,
diff.WithMediaType(mediaType),
diff.WithReference(sr.ID()),
diff.WithCompressor(compressorFunc),
)
if err != nil {
return nil, err
Expand All @@ -108,6 +118,15 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
if desc.Annotations == nil {
desc.Annotations = map[string]string{}
}
if finalize != nil {
a, err := finalize(ctx, sr.cm.ContentStore)
if err != nil {
return nil, errors.Wrapf(err, "failed to finalize compression")
}
for k, v := range a {
desc.Annotations[k] = v
}
}

info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
if err != nil {
Expand All @@ -122,23 +141,18 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.Errorf("unknown layer compression type")
}

if err := sr.setBlob(ctx, desc); err != nil {
if err := sr.setBlob(ctx, compressionType, desc); err != nil {
return nil, err
}

return desc, nil
return nil, nil
})
if err != nil {
return err
}
descr, ok := v.(ocispecs.Descriptor)
if !ok {
return fmt.Errorf("invalid descriptor returned by differ while computing blob for %s", sr.ID())
}

if forceCompression {
if err := ensureCompression(ctx, sr, descr, compressionType, s); err != nil {
return err
if err := ensureCompression(ctx, sr, compressionType, s); err != nil {
return errors.Wrapf(err, "failed to ensure compression type of %q", compressionType)
}
}
return nil
Expand All @@ -153,7 +167,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
// setBlob associates a blob with the cache record.
// A lease must be held for the blob when calling this function
// Caller should call Info() for knowing what current values are actually set
func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) error {
func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression.Type, desc ocispecs.Descriptor) error {
if _, ok := leases.FromContext(ctx); !ok {
return errors.Errorf("missing lease requirement for setBlob")
}
Expand All @@ -166,7 +180,6 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e
return err
}

compressionType := compression.FromMediaType(desc.MediaType)
if compressionType == compression.UnknownCompression {
return errors.Errorf("unhandled layer media type: %q", desc.MediaType)
}
Expand Down Expand Up @@ -197,7 +210,7 @@ func (sr *immutableRef) setBlob(ctx context.Context, desc ocispecs.Descriptor) e
return err
}

if err := sr.addCompressionBlob(ctx, desc.Digest, compressionType); err != nil {
if err := sr.addCompressionBlob(ctx, desc, compressionType); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -247,14 +260,25 @@ func isTypeWindows(sr *immutableRef) bool {
}

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Descriptor, compressionType compression.Type, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", desc.Digest, compressionType), func(ctx context.Context) (interface{}, error) {
func ensureCompression(ctx context.Context, ref *immutableRef, compressionType compression.Type, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), compressionType), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers)
if err != nil {
return nil, err
}

// Resolve converters
layerConvertFunc, _, err := getConverters(desc, compressionType)
layerConvertFunc, err := getConverter(desc, compressionType)
if err != nil {
return nil, err
} else if layerConvertFunc == nil {
return nil, nil // no need to convert
if isLazy, err := ref.isLazy(ctx); err != nil {
return nil, err
} else if isLazy {
// This ref can be used as the specified compressionType. Keep it lazy.
return nil, nil
}
return nil, ref.addCompressionBlob(ctx, desc, compressionType)
}

// First, lookup local content store
Expand All @@ -277,7 +301,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, desc ocispecs.Des
}

// Start to track converted layer
if err := ref.addCompressionBlob(ctx, newDesc.Digest, compressionType); err != nil {
if err := ref.addCompressionBlob(ctx, *newDesc, compressionType); err != nil {
return nil, err
}
return nil, nil
Expand Down
179 changes: 101 additions & 78 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,86 +15,124 @@ import (
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

// getConverters returns converter functions according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverters(desc ocispecs.Descriptor, compressionType compression.Type) (converter.ConvertFunc, func(string) string, error) {
// needsConversion indicates whether a conversion is needed for the specified mediatype to
// be the compressionType.
func needsConversion(mediaType string, compressionType compression.Type) (bool, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(desc.MediaType) || uncompress.IsUncompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
if !images.IsLayerType(mediaType) || uncompress.IsUncompressedType(mediaType) {
return false, nil
}
return uncompress.LayerConvertFunc, convertMediaTypeToUncompress, nil
case compression.Gzip:
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// No conversion. No need to return an error here.
return nil, nil, nil
if !images.IsLayerType(mediaType) || isGzipCompressedType(mediaType) {
return false, nil
}
case compression.EStargz:
if !images.IsLayerType(mediaType) {
return false, nil
}
return gzipLayerConvertFunc, convertMediaTypeToGzip, nil
default:
return nil, nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
return false, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
return true, nil
}

func gzipLayerConvertFunc(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
if !images.IsLayerType(desc.MediaType) || isGzipCompressedType(desc.MediaType) {
// getConverter returns converter function according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverter(desc ocispecs.Descriptor, compressionType compression.Type) (converter.ConvertFunc, error) {
if needs, err := needsConversion(desc.MediaType, compressionType); err != nil {
return nil, err
} else if !needs {
// No conversion. No need to return an error here.
return nil, nil
}

// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-gzip-from-%s", desc.Digest)
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
switch compressionType {
case compression.Uncompressed:
return uncompress.LayerConvertFunc, nil
case compression.Gzip:
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(w), nil }
return gzipLayerConvertFunc(compressionType, convertFunc, nil), nil
case compression.EStargz:
compressorFunc, finalize := writeEStargz()
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) }
return gzipLayerConvertFunc(compressionType, convertFunc, finalize), nil
default:
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
zw := gzip.NewWriter(w)
defer zw.Close()
}

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}
func gzipLayerConvertFunc(compressionType compression.Type, convertFunc func(w io.Writer) (io.WriteCloser, error), finalize func(context.Context, content.Store) (map[string]string, error)) converter.ConvertFunc {
return func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, compressionType.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw, err := convertFunc(w)
if err != nil {
return nil, err
}
defer zw.Close()

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(newDesc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
return &newDesc, nil
newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(desc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if finalize != nil {
a, err := finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
}
return &newDesc, nil
}
}

func isGzipCompressedType(mt string) bool {
Expand All @@ -110,21 +148,6 @@ func isGzipCompressedType(mt string) bool {
}
}

func convertMediaTypeToUncompress(mt string) string {
switch mt {
case images.MediaTypeDockerSchema2LayerGzip:
return images.MediaTypeDockerSchema2Layer
case images.MediaTypeDockerSchema2LayerForeignGzip:
return images.MediaTypeDockerSchema2LayerForeign
case ocispecs.MediaTypeImageLayerGzip:
return ocispecs.MediaTypeImageLayer
case ocispecs.MediaTypeImageLayerNonDistributableGzip:
return ocispecs.MediaTypeImageLayerNonDistributable
default:
return mt
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
Expand Down
Loading

0 comments on commit f8d30d5

Please sign in to comment.