From 425df9e985773cf415dfc834e826d76a85d363ca Mon Sep 17 00:00:00 2001 From: "zhuangbowei.zbw" Date: Fri, 1 Mar 2024 11:56:05 +0800 Subject: [PATCH] convertor: handle recursive index & concurrency limit Signed-off-by: zhuangbowei.zbw --- cmd/convertor/builder/builder.go | 371 ++++++++++++--------- cmd/convertor/builder/builder_engine.go | 12 +- cmd/convertor/builder/overlaybd_builder.go | 2 +- cmd/convertor/builder/turboOCI_builder.go | 4 +- cmd/convertor/main.go | 39 ++- docs/USERSPACE_CONVERTOR.md | 1 + 6 files changed, 243 insertions(+), 186 deletions(-) diff --git a/cmd/convertor/builder/builder.go b/cmd/convertor/builder/builder.go index 24bf52af..f6fc3f03 100644 --- a/cmd/convertor/builder/builder.go +++ b/cmd/convertor/builder/builder.go @@ -30,6 +30,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/containerd/accelerated-container-image/cmd/convertor/database" @@ -37,6 +38,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -60,11 +62,208 @@ type BuilderOptions struct { Reserve bool NoUpload bool DumpManifest bool + + // ConcurrencyLimit limits the number of manifests that can be built at once + // 0 means no limit + ConcurrencyLimit int } -type overlaybdBuilder struct { - layers int - engine builderEngine +type graphBuilder struct { + // required + Resolver remotes.Resolver + + // options + BuilderOptions + + // private + fetcher remotes.Fetcher + pusher remotes.Pusher + tagPusher remotes.Pusher + group *errgroup.Group + sem chan struct{} + id atomic.Int32 +} + +func (b *graphBuilder) Build(ctx context.Context) error { + fetcher, err := b.Resolver.Fetcher(ctx, b.Ref) + if err != nil { + return fmt.Errorf("failed to obtain new fetcher: %w", err) + } + pusher, err := b.Resolver.Pusher(ctx, b.TargetRef+"@") // append '@' to avoid tag + if err != nil { + return fmt.Errorf("failed to obtain new pusher: %w", err) + } + tagPusher, err := b.Resolver.Pusher(ctx, b.TargetRef) // append '@' to avoid tag + if err != nil { + return fmt.Errorf("failed to obtain new tag pusher: %w", err) + } + b.fetcher = fetcher + b.pusher = pusher + b.tagPusher = tagPusher + _, src, err := b.Resolver.Resolve(ctx, b.Ref) + if err != nil { + return fmt.Errorf("failed to resolve: %w", err) + } + + g, gctx := errgroup.WithContext(ctx) + b.group = g + if b.ConcurrencyLimit > 0 { + b.sem = make(chan struct{}, b.ConcurrencyLimit) + } + g.Go(func() error { + target, err := b.process(gctx, src, true) + if err != nil { + return fmt.Errorf("failed to build %q: %w", src.Digest, err) + } + log.G(gctx).Infof("converted to %q, digest: %q", b.TargetRef, target.Digest) + return nil + }) + return g.Wait() +} + +func (b *graphBuilder) process(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) { + switch src.MediaType { + case v1.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: + return b.buildOne(ctx, src, tag) + case v1.MediaTypeImageIndex, images.MediaTypeDockerSchema2ManifestList: + var index v1.Index + rc, err := b.fetcher.Fetch(ctx, src) + if err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to fetch index: %w", err) + } + defer rc.Close() + indexBytes, err := io.ReadAll(rc) + if err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to read index: %w", err) + } + if err := json.Unmarshal(indexBytes, &index); err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to unmarshal index: %w", err) + } + var wg sync.WaitGroup + for _i, _m := range index.Manifests { + i := _i + m := _m + wg.Add(1) + b.group.Go(func() error { + defer wg.Done() + target, err := b.process(ctx, m, false) + if err != nil { + return fmt.Errorf("failed to build %q: %w", m.Digest, err) + } + index.Manifests[i] = target + return nil + }) + } + wg.Wait() + if ctx.Err() != nil { + return v1.Descriptor{}, ctx.Err() + } + + // upload index + indexBytes, err = json.Marshal(index) + if err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to marshal index: %w", err) + } + expected := src + expected.Digest = digest.FromBytes(indexBytes) + expected.Size = int64(len(indexBytes)) + var pusher remotes.Pusher + if tag { + pusher = b.tagPusher + } else { + pusher = b.pusher + } + if err := uploadBytes(ctx, pusher, expected, indexBytes); err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to upload index: %w", err) + } + return expected, nil + default: + return v1.Descriptor{}, fmt.Errorf("unsupported media type %q", src.MediaType) + } +} + +func (b *graphBuilder) buildOne(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) { + if b.sem != nil { + select { + case <-ctx.Done(): + return v1.Descriptor{}, ctx.Err() + case b.sem <- struct{}{}: + } + } + defer func() { + if b.sem != nil { + select { + case <-ctx.Done(): + case <-b.sem: + } + } + }() + id := b.id.Add(1) + + var platform string + if src.Platform == nil { + platform = "" + } else { + platform = platforms.Format(*src.Platform) + ctx = log.WithLogger(ctx, log.G(ctx).WithField("platform", platform)) + } + workdir := filepath.Join(b.WorkDir, fmt.Sprintf("%d-%s-%s", id, strings.ReplaceAll(platform, "/", "_"), src.Digest.Encoded())) + log.G(ctx).Infof("building %s ...", workdir) + + // init build engine + manifest, config, err := fetchManifestAndConfig(ctx, b.fetcher, src) + if err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to fetch manifest and config: %w", err) + } + var pusher remotes.Pusher + if tag { + pusher = b.tagPusher + } else { + pusher = b.pusher + } + engineBase := &builderEngineBase{ + resolver: b.Resolver, + fetcher: b.fetcher, + pusher: pusher, + manifest: *manifest, + config: *config, + inputDesc: src, + } + engineBase.workDir = workdir + engineBase.oci = b.OCI + engineBase.mkfs = b.Mkfs + engineBase.vsize = b.Vsize + engineBase.db = b.DB + refspec, err := reference.Parse(b.Ref) + if err != nil { + return v1.Descriptor{}, err + } + engineBase.host = refspec.Hostname() + engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/") + engineBase.reserve = b.Reserve + engineBase.noUpload = b.NoUpload + engineBase.dumpManifest = b.DumpManifest + + var engine builderEngine + switch b.Engine { + case Overlaybd: + engine = NewOverlayBDBuilderEngine(engineBase) + case TurboOCI: + engine = NewTurboOCIBuilderEngine(engineBase) + } + + // build + builder := &overlaybdBuilder{ + layers: len(engineBase.manifest.Layers), + engine: engine, + } + desc, err := builder.Build(ctx) + if err != nil { + return v1.Descriptor{}, fmt.Errorf("failed to build %s: %w", workdir, err) + } + src.Digest = desc.Digest + src.Size = desc.Size + return src, nil } func Build(ctx context.Context, opt BuilderOptions) error { @@ -97,162 +296,16 @@ func Build(ctx context.Context, opt BuilderOptions) error { Transport: transport, }, }) - fetcher, err := resolver.Fetcher(ctx, opt.Ref) - if err != nil { - return fmt.Errorf("failed to new fetcher: %w", err) - } - - _, src, err := resolver.Resolve(ctx, opt.Ref) - if err != nil { - return fmt.Errorf("failed to resolve: %w", err) - } - var manifestDescs []v1.Descriptor - switch src.MediaType { - case v1.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: - manifestDescs = append(manifestDescs, src) - case v1.MediaTypeImageIndex, images.MediaTypeDockerSchema2ManifestList: - rc, err := fetcher.Fetch(ctx, src) - if err != nil { - return fmt.Errorf("failed to fetch index: %w", err) - } - defer rc.Close() - b, err := io.ReadAll(rc) - if err != nil { - return fmt.Errorf("failed to read index: %w", err) - } - var index v1.Index - if err := json.Unmarshal(b, &index); err != nil { - return fmt.Errorf("failed to parse index: %w", err) - } - // (TODO) platform filter - manifestDescs = append(manifestDescs, index.Manifests...) - } - if len(manifestDescs) == 0 { - log.G(ctx).Warn("no matched manifest, do nothing") - return nil - } - - var mu sync.Mutex - var targetDescs []v1.Descriptor - g, gctx := errgroup.WithContext(ctx) - for _, _mDesc := range manifestDescs { - mDesc := _mDesc - g.Go(func() error { - var platform string - var rctx context.Context - if mDesc.Platform == nil { - platform = "" - rctx = gctx - } else { - platform = platforms.Format(*mDesc.Platform) - rctx = log.WithLogger(gctx, log.G(ctx).WithField("platform", platform)) - } - workdir := filepath.Join(opt.WorkDir, strings.ReplaceAll(platform, "/", "-")) - - log.G(rctx).WithFields(log.Fields{ - "manifest digest": mDesc.Digest, - "work dir": workdir, - }).Info("building ...") - - // init build engine - fetcher, err := resolver.Fetcher(rctx, opt.Ref) - if err != nil { - return fmt.Errorf("failed to new fetcher: %w", err) - } - targetRef := opt.TargetRef - if len(manifestDescs) > 1 { - targetRef += "@" // append '@' to avoid tag - } - pusher, err := resolver.Pusher(rctx, targetRef) - if err != nil { - return fmt.Errorf("failed to new pusher: %w", err) - } - manifest, config, err := fetchManifestAndConfig(ctx, fetcher, mDesc) - if err != nil { - return fmt.Errorf("failed to fetch manifest and config: %w", err) - } - engineBase := &builderEngineBase{ - resolver: resolver, - fetcher: fetcher, - pusher: pusher, - manifest: *manifest, - config: *config, - inputDesc: mDesc, - } - engineBase.workDir = workdir - engineBase.oci = opt.OCI - engineBase.mkfs = opt.Mkfs - engineBase.vsize = opt.Vsize - engineBase.db = opt.DB - refspec, err := reference.Parse(opt.Ref) - if err != nil { - return err - } - engineBase.host = refspec.Hostname() - engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/") - engineBase.reserve = opt.Reserve - engineBase.noUpload = opt.NoUpload - engineBase.dumpManifest = opt.DumpManifest - - var engine builderEngine - switch opt.Engine { - case Overlaybd: - engine = NewOverlayBDBuilderEngine(engineBase) - case TurboOCI: - engine = NewTurboOCIBuilderEngine(engineBase) - } - - // build - builder := &overlaybdBuilder{ - layers: len(engineBase.manifest.Layers), - engine: engine, - } - desc, err := builder.Build(rctx) - if err != nil { - return fmt.Errorf("(platform %s) failed to build: %w", platform, err) - } - desc.Platform = mDesc.Platform - - mu.Lock() - defer mu.Unlock() - targetDescs = append(targetDescs, desc) - return nil - }) - } - if err := g.Wait(); err != nil { - return err - } - - // tag index - if len(targetDescs) > 1 { - index := v1.Index{} - index.SchemaVersion = 2 - if targetDescs[0].MediaType == v1.MediaTypeImageManifest { - index.MediaType = v1.MediaTypeImageIndex - } else { - index.MediaType = images.MediaTypeDockerSchema2ManifestList - } - index.Manifests = targetDescs - b, err := json.Marshal(index) - if err != nil { - return fmt.Errorf("failed to marshal index: %w", err) - } - expected := v1.Descriptor{ - Digest: digest.FromBytes(b), - Size: int64(len(b)), - MediaType: index.MediaType, - } - pusher, err := resolver.Pusher(ctx, opt.TargetRef) - if err != nil { - return fmt.Errorf("failed to new pusher: %w", err) - } - if err := uploadBytes(ctx, pusher, expected, b); err != nil { - return fmt.Errorf("failed to upload index: %w", err) - } - } + return (&graphBuilder{ + Resolver: resolver, + BuilderOptions: opt, + }).Build(ctx) +} - return nil +type overlaybdBuilder struct { + layers int + engine builderEngine } // Build return a descriptor of the converted target, as the caller may need it @@ -361,12 +414,12 @@ func (b *overlaybdBuilder) Build(ctx context.Context) (v1.Descriptor, error) { }) } if err := g.Wait(); err != nil { - return v1.DescriptorEmptyJSON, err + return v1.Descriptor{}, err } targetDesc, err := b.engine.UploadImage(ctx) if err != nil { - return v1.DescriptorEmptyJSON, errors.Wrap(err, "failed to upload manifest or config") + return v1.Descriptor{}, errors.Wrap(err, "failed to upload manifest or config") } b.engine.StoreConvertedManifestDetails(ctx) logrus.Info("convert finished") diff --git a/cmd/convertor/builder/builder_engine.go b/cmd/convertor/builder/builder_engine.go index 205586ca..b664a3f3 100644 --- a/cmd/convertor/builder/builder_engine.go +++ b/cmd/convertor/builder/builder_engine.go @@ -152,7 +152,7 @@ func (e *builderEngineBase) mediaTypeImageLayer() string { func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) (specs.Descriptor, error) { cbuf, err := json.Marshal(e.config) if err != nil { - return specs.DescriptorEmptyJSON, err + return specs.Descriptor{}, err } e.manifest.Config = specs.Descriptor{ MediaType: e.mediaTypeConfig(), @@ -161,14 +161,14 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) (specs. } if !e.noUpload { if err = uploadBytes(ctx, e.pusher, e.manifest.Config, cbuf); err != nil { - return specs.DescriptorEmptyJSON, errors.Wrapf(err, "failed to upload config") + return specs.Descriptor{}, errors.Wrapf(err, "failed to upload config") } logrus.Infof("config uploaded") } if e.dumpManifest { confPath := path.Join(e.workDir, "config.json") if err := continuity.AtomicWriteFile(confPath, cbuf, 0644); err != nil { - return specs.DescriptorEmptyJSON, err + return specs.Descriptor{}, err } logrus.Infof("config dumped") } @@ -176,7 +176,7 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) (specs. e.manifest.MediaType = e.mediaTypeManifest() cbuf, err = json.Marshal(e.manifest) if err != nil { - return specs.DescriptorEmptyJSON, err + return specs.Descriptor{}, err } manifestDesc := specs.Descriptor{ MediaType: e.mediaTypeManifest(), @@ -185,7 +185,7 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) (specs. } if !e.noUpload { if err = uploadBytes(ctx, e.pusher, manifestDesc, cbuf); err != nil { - return specs.DescriptorEmptyJSON, errors.Wrapf(err, "failed to upload manifest") + return specs.Descriptor{}, errors.Wrapf(err, "failed to upload manifest") } e.outputDesc = manifestDesc logrus.Infof("manifest uploaded") @@ -193,7 +193,7 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) (specs. if e.dumpManifest { descPath := path.Join(e.workDir, "manifest.json") if err := continuity.AtomicWriteFile(descPath, cbuf, 0644); err != nil { - return specs.DescriptorEmptyJSON, err + return specs.Descriptor{}, err } logrus.Infof("manifest dumped") } diff --git a/cmd/convertor/builder/overlaybd_builder.go b/cmd/convertor/builder/overlaybd_builder.go index 811cc35c..09a8c4f1 100644 --- a/cmd/convertor/builder/overlaybd_builder.go +++ b/cmd/convertor/builder/overlaybd_builder.go @@ -166,7 +166,7 @@ func (e *overlaybdBuilderEngine) UploadImage(ctx context.Context) (specs.Descrip if !e.mkfs { baseDesc, err := e.uploadBaseLayer(ctx) if err != nil { - return specs.DescriptorEmptyJSON, err + return specs.Descriptor{}, err } e.manifest.Layers = append([]specs.Descriptor{baseDesc}, e.manifest.Layers...) e.config.RootFS.DiffIDs = append([]digest.Digest{baseDesc.Digest}, e.config.RootFS.DiffIDs...) diff --git a/cmd/convertor/builder/turboOCI_builder.go b/cmd/convertor/builder/turboOCI_builder.go index aae60330..83ffeeb9 100644 --- a/cmd/convertor/builder/turboOCI_builder.go +++ b/cmd/convertor/builder/turboOCI_builder.go @@ -172,7 +172,7 @@ func (e *turboOCIBuilderEngine) UploadImage(ctx context.Context) (specs.Descript layerDir := e.getLayerDir(idx) uncompress, err := getFileDesc(path.Join(layerDir, tociLayerTar), true) if err != nil { - return specs.DescriptorEmptyJSON, errors.Wrapf(err, "failed to get uncompressed descriptor for layer %d", idx) + return specs.Descriptor{}, errors.Wrapf(err, "failed to get uncompressed descriptor for layer %d", idx) } e.manifest.Layers[idx] = e.tociLayers[idx] e.config.RootFS.DiffIDs[idx] = uncompress.Digest @@ -189,7 +189,7 @@ func (e *turboOCIBuilderEngine) UploadImage(ctx context.Context) (specs.Descript } if !e.mkfs { if err := uploadBlob(ctx, e.pusher, overlaybdBaseLayer, baseDesc); err != nil { - return specs.DescriptorEmptyJSON, errors.Wrapf(err, "failed to upload baselayer %q", overlaybdBaseLayer) + return specs.Descriptor{}, errors.Wrapf(err, "failed to upload baselayer %q", overlaybdBaseLayer) } e.manifest.Layers = append([]specs.Descriptor{baseDesc}, e.manifest.Layers...) e.config.RootFS.DiffIDs = append([]digest.Digest{baseDesc.Digest}, e.config.RootFS.DiffIDs...) diff --git a/cmd/convertor/main.go b/cmd/convertor/main.go index 39de6be3..374412a6 100644 --- a/cmd/convertor/main.go +++ b/cmd/convertor/main.go @@ -31,21 +31,22 @@ import ( ) var ( - repo string - user string - plain bool - tagInput string - tagOutput string - dir string - oci bool - mkfs bool - verbose bool - vsize int - fastoci string - turboOCI string - overlaybd string - dbstr string - dbType string + repo string + user string + plain bool + tagInput string + tagOutput string + dir string + oci bool + mkfs bool + verbose bool + vsize int + fastoci string + turboOCI string + overlaybd string + dbstr string + dbType string + concurrencyLimit int // certification certDirs []string @@ -95,9 +96,10 @@ var ( ClientCerts: clientCerts, Insecure: insecure, }, - Reserve: reserve, - NoUpload: noUpload, - DumpManifest: dumpManifest, + Reserve: reserve, + NoUpload: noUpload, + DumpManifest: dumpManifest, + ConcurrencyLimit: concurrencyLimit, } if overlaybd != "" { logrus.Info("building [Overlaybd - Native] image...") @@ -158,6 +160,7 @@ func init() { rootCmd.Flags().StringVar(&overlaybd, "overlaybd", "", "build overlaybd format") rootCmd.Flags().StringVar(&dbstr, "db-str", "", "db str for overlaybd conversion") rootCmd.Flags().StringVar(&dbType, "db-type", "", "type of db to use for conversion deduplication. Available: mysql. Default none") + rootCmd.Flags().IntVar(&concurrencyLimit, "concurrency-limit", 4, "the number of manifests that can be built at the same time, used for multi-arch images, 0 means no limit") // certification rootCmd.Flags().StringArrayVar(&certDirs, "cert-dir", nil, "In these directories, root CA should be named as *.crt and client cert should be named as *.cert, *.key") diff --git a/docs/USERSPACE_CONVERTOR.md b/docs/USERSPACE_CONVERTOR.md index afdec708..d0b35dd7 100644 --- a/docs/USERSPACE_CONVERTOR.md +++ b/docs/USERSPACE_CONVERTOR.md @@ -49,6 +49,7 @@ Flags: --overlaybd string build overlaybd format --db-str string db str for overlaybd conversion --db-type string type of db to use for conversion deduplication. Available: mysql. Default none + --concurrency-limit int the number of manifests that can be built at the same time, used for multi-arch images, 0 means no limit (default 4) --cert-dir stringArray In these directories, root CA should be named as *.crt and client cert should be named as *.cert, *.key --root-ca stringArray root CA certificates --client-cert stringArray client cert certificates, should form in ${cert-file}:${key-file}