From d35eb60dd611a04b03afbbbf208f3ca9cee97a62 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 17 Jun 2018 23:57:19 -0400 Subject: [PATCH] Split private mirror functions for reuse These files will be copied for now and then refactored later into reusable packages. --- pkg/oc/cli/cmd/image/mirror/manifest.go | 178 +++++++++++++++++++++++ pkg/oc/cli/cmd/image/mirror/mappings.go | 82 +---------- pkg/oc/cli/cmd/image/mirror/mirror.go | 164 +-------------------- pkg/oc/cli/cmd/image/mirror/workqueue.go | 131 +++++++++++++++++ 4 files changed, 312 insertions(+), 243 deletions(-) create mode 100644 pkg/oc/cli/cmd/image/mirror/manifest.go create mode 100644 pkg/oc/cli/cmd/image/mirror/workqueue.go diff --git a/pkg/oc/cli/cmd/image/mirror/manifest.go b/pkg/oc/cli/cmd/image/mirror/manifest.go new file mode 100644 index 000000000000..ac775c73f583 --- /dev/null +++ b/pkg/oc/cli/cmd/image/mirror/manifest.go @@ -0,0 +1,178 @@ +package mirror + +import ( + "context" + "fmt" + "sync" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/api/v2" + + "github.com/docker/libtrust" + "github.com/golang/glog" + digest "github.com/opencontainers/go-digest" + + imageapi "github.com/openshift/origin/pkg/image/apis/image" +) + +func processManifestList(ctx context.Context, srcDigest digest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor, bool) bool) ([]distribution.Manifest, distribution.Manifest, digest.Digest, error) { + var srcManifests []distribution.Manifest + switch t := srcManifest.(type) { + case *manifestlist.DeserializedManifestList: + manifestDigest := srcDigest + manifestList := t + + filtered := make([]manifestlist.ManifestDescriptor, 0, len(t.Manifests)) + for _, manifest := range t.Manifests { + if !filterFn(&manifest, len(t.Manifests) > 1) { + glog.V(5).Infof("Skipping image for %#v from %s", manifest.Platform, ref) + continue + } + glog.V(5).Infof("Including image for %#v from %s", manifest.Platform, ref) + filtered = append(filtered, manifest) + } + + if len(filtered) == 0 { + return nil, nil, "", nil + } + + // if we're filtering the manifest list, update the source manifest and digest + if len(filtered) != len(t.Manifests) { + var err error + t, err = manifestlist.FromDescriptors(filtered) + if err != nil { + return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list: %v", ref, err) + } + _, body, err := t.Payload() + if err != nil { + return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list (bad payload): %v", ref, err) + } + manifestList = t + manifestDigest = srcDigest.Algorithm().FromBytes(body) + glog.V(5).Infof("Filtered manifest list to new digest %s:\n%s", manifestDigest, body) + } + + for i, manifest := range t.Manifests { + childManifest, err := manifests.Get(ctx, manifest.Digest, distribution.WithManifestMediaTypes([]string{manifestlist.MediaTypeManifestList, schema2.MediaTypeManifest})) + if err != nil { + return nil, nil, "", fmt.Errorf("unable to retrieve source image %s manifest #%d from manifest list: %v", ref, i+1, err) + } + srcManifests = append(srcManifests, childManifest) + } + + switch { + case len(srcManifests) == 1: + _, body, err := srcManifests[0].Payload() + if err != nil { + return nil, nil, "", fmt.Errorf("unable to convert source image %s manifest list to single manifest: %v", ref, err) + } + manifestDigest := srcDigest.Algorithm().FromBytes(body) + glog.V(5).Infof("Used only one manifest from the list %s", manifestDigest) + return srcManifests, srcManifests[0], manifestDigest, nil + default: + return append(srcManifests, manifestList), manifestList, manifestDigest, nil + } + + default: + return []distribution.Manifest{srcManifest}, srcManifest, srcDigest, nil + } +} + +// TDOO: remove when quay.io switches to v2 schema +func putManifestInCompatibleSchema( + ctx context.Context, + srcManifest distribution.Manifest, + tag string, + toManifests distribution.ManifestService, + // supports schema2 -> schema1 downconversion + blobs distribution.BlobService, + ref reference.Named, +) (digest.Digest, error) { + var options []distribution.ManifestServiceOption + if len(tag) > 0 { + glog.V(5).Infof("Put manifest %s:%s", ref, tag) + options = []distribution.ManifestServiceOption{distribution.WithTag(tag)} + } else { + glog.V(5).Infof("Put manifest %s", ref) + } + toDigest, err := toManifests.Put(ctx, srcManifest, options...) + if err == nil { + return toDigest, nil + } + errs, ok := err.(errcode.Errors) + if !ok || len(errs) == 0 { + return toDigest, err + } + errcode, ok := errs[0].(errcode.Error) + if !ok || errcode.ErrorCode() != v2.ErrorCodeManifestInvalid { + return toDigest, err + } + // try downconverting to v2-schema1 + schema2Manifest, ok := srcManifest.(*schema2.DeserializedManifest) + if !ok { + return toDigest, err + } + tagRef, tagErr := reference.WithTag(ref, tag) + if tagErr != nil { + return toDigest, err + } + glog.V(5).Infof("Registry reported invalid manifest error, attempting to convert to v2schema1 as ref %s", tagRef) + schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, tagRef) + if convertErr != nil { + return toDigest, err + } + if glog.V(6) { + _, data, _ := schema1Manifest.Payload() + glog.Infof("Converted to v2schema1\n%s", string(data)) + } + return toManifests.Put(ctx, schema1Manifest, distribution.WithTag(tag)) +} + +// TDOO: remove when quay.io switches to v2 schema +func convertToSchema1(ctx context.Context, blobs distribution.BlobService, schema2Manifest *schema2.DeserializedManifest, ref reference.Named) (distribution.Manifest, error) { + targetDescriptor := schema2Manifest.Target() + configJSON, err := blobs.Get(ctx, targetDescriptor.Digest) + if err != nil { + return nil, err + } + trustKey, err := loadPrivateKey() + if err != nil { + return nil, err + } + builder := schema1.NewConfigManifestBuilder(blobs, trustKey, ref, configJSON) + for _, d := range schema2Manifest.Layers { + if err := builder.AppendReference(d); err != nil { + return nil, err + } + } + manifest, err := builder.Build(ctx) + if err != nil { + return nil, err + } + return manifest, nil +} + +var ( + privateKeyLock sync.Mutex + privateKey libtrust.PrivateKey +) + +// TDOO: remove when quay.io switches to v2 schema +func loadPrivateKey() (libtrust.PrivateKey, error) { + privateKeyLock.Lock() + defer privateKeyLock.Unlock() + if privateKey != nil { + return privateKey, nil + } + trustKey, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, err + } + privateKey = trustKey + return privateKey, nil +} diff --git a/pkg/oc/cli/cmd/image/mirror/mappings.go b/pkg/oc/cli/cmd/image/mirror/mappings.go index 558d68bd70d5..026150b70f66 100644 --- a/pkg/oc/cli/cmd/image/mirror/mappings.go +++ b/pkg/oc/cli/cmd/image/mirror/mappings.go @@ -7,11 +7,8 @@ import ( "strings" "sync" - "github.com/golang/glog" - "github.com/docker/distribution/registry/client/auth" - - godigest "github.com/opencontainers/go-digest" + digest "github.com/opencontainers/go-digest" imageapi "github.com/openshift/origin/pkg/image/apis/image" ) @@ -170,7 +167,7 @@ type destinations struct { digests map[string]pushTargets } -func (d *destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) { +func (d *destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTargets) { d.lock.Lock() defer d.lock.Unlock() srcKey := srcDigest.String() @@ -278,78 +275,3 @@ func calculateDockerRegistryScopes(tree targetTree) map[string][]auth.Scope { } return uniqueScopes } - -type workQueue struct { - ch chan workUnit - wg *sync.WaitGroup -} - -func newWorkQueue(workers int, stopCh <-chan struct{}) *workQueue { - q := &workQueue{ - ch: make(chan workUnit, 100), - wg: &sync.WaitGroup{}, - } - go q.run(workers, stopCh) - return q -} - -func (q *workQueue) run(workers int, stopCh <-chan struct{}) { - for i := 0; i < workers; i++ { - go func(i int) { - defer glog.V(4).Infof("worker %d stopping", i) - for { - select { - case work, ok := <-q.ch: - if !ok { - return - } - work.fn() - work.wg.Done() - case <-stopCh: - return - } - } - }(i) - } - <-stopCh -} - -func (q *workQueue) Batch(fn func(Work)) { - w := &worker{ - wg: &sync.WaitGroup{}, - ch: q.ch, - } - fn(w) - w.wg.Wait() -} - -func (q *workQueue) Queue(fn func(Work)) { - w := &worker{ - wg: q.wg, - ch: q.ch, - } - fn(w) -} - -func (q *workQueue) Done() { - q.wg.Wait() -} - -type workUnit struct { - fn func() - wg *sync.WaitGroup -} - -type Work interface { - Parallel(fn func()) -} - -type worker struct { - wg *sync.WaitGroup - ch chan workUnit -} - -func (w *worker) Parallel(fn func()) { - w.wg.Add(1) - w.ch <- workUnit{wg: w.wg, fn: fn} -} diff --git a/pkg/oc/cli/cmd/image/mirror/mirror.go b/pkg/oc/cli/cmd/image/mirror/mirror.go index dc5c3c3d922f..897dd760651d 100644 --- a/pkg/oc/cli/cmd/image/mirror/mirror.go +++ b/pkg/oc/cli/cmd/image/mirror/mirror.go @@ -4,20 +4,15 @@ import ( "fmt" "io" "regexp" - "sync" "time" "github.com/docker/distribution" "github.com/docker/distribution/manifest/manifestlist" - "github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/reference" - "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" units "github.com/docker/go-units" - "github.com/docker/libtrust" "github.com/golang/glog" godigest "github.com/opencontainers/go-digest" "github.com/spf13/cobra" @@ -192,7 +187,7 @@ func (o *pushOptions) Repository(ctx apirequest.Context, context *registryclient } // includeDescriptor returns true if the provided manifest should be included. -func (o *pushOptions) includeDescriptor(d *manifestlist.ManifestDescriptor) bool { +func (o *pushOptions) includeDescriptor(d *manifestlist.ManifestDescriptor, hasMultiple bool) bool { if o.OSFilter == nil { return true } @@ -465,69 +460,6 @@ func (o *pushOptions) plan() (*plan, error) { return plan, nil } -func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor) bool) ([]distribution.Manifest, distribution.Manifest, godigest.Digest, error) { - var srcManifests []distribution.Manifest - switch t := srcManifest.(type) { - case *manifestlist.DeserializedManifestList: - manifestDigest := srcDigest - manifestList := t - - filtered := make([]manifestlist.ManifestDescriptor, 0, len(t.Manifests)) - for _, manifest := range t.Manifests { - if !filterFn(&manifest) { - glog.V(5).Infof("Skipping image for %#v from %s", manifest.Platform, ref) - continue - } - glog.V(5).Infof("Including image for %#v from %s", manifest.Platform, ref) - filtered = append(filtered, manifest) - } - - if len(filtered) == 0 { - return nil, nil, "", nil - } - - // if we're filtering the manifest list, update the source manifest and digest - if len(filtered) != len(t.Manifests) { - var err error - t, err = manifestlist.FromDescriptors(filtered) - if err != nil { - return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list: %v", ref, err) - } - _, body, err := t.Payload() - if err != nil { - return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list (bad payload): %v", ref, err) - } - manifestList = t - manifestDigest = srcDigest.Algorithm().FromBytes(body) - glog.V(5).Infof("Filtered manifest list to new digest %s:\n%s", manifestDigest, body) - } - - for i, manifest := range t.Manifests { - childManifest, err := manifests.Get(ctx, manifest.Digest, distribution.WithManifestMediaTypes([]string{manifestlist.MediaTypeManifestList, schema2.MediaTypeManifest})) - if err != nil { - return nil, nil, "", fmt.Errorf("unable to retrieve source image %s manifest #%d from manifest list: %v", ref, i+1, err) - } - srcManifests = append(srcManifests, childManifest) - } - - switch { - case len(srcManifests) == 1: - _, body, err := srcManifests[0].Payload() - if err != nil { - return nil, nil, "", fmt.Errorf("unable to convert source image %s manifest list to single manifest: %v", ref, err) - } - manifestDigest := srcDigest.Algorithm().FromBytes(body) - glog.V(5).Infof("Used only one manifest from the list %s", manifestDigest) - return srcManifests, srcManifests[0], manifestDigest, nil - default: - return append(srcManifests, manifestList), manifestList, manifestDigest, nil - } - - default: - return []distribution.Manifest{srcManifest}, srcManifest, srcDigest, nil - } -} - func copyBlob(ctx apirequest.Context, plan *workPlan, c *repositoryBlobCopy, blob distribution.Descriptor, force, skipMount bool, errOut io.Writer) error { // if we aren't forcing upload, check to see if the blob aleady exists if !force { @@ -702,100 +634,6 @@ func copyManifests( return errs } -// TDOO: remove when quay.io switches to v2 schema -func putManifestInCompatibleSchema( - ctx apirequest.Context, - srcManifest distribution.Manifest, - tag string, - toManifests distribution.ManifestService, - // supports schema2 -> schema1 downconversion - blobs distribution.BlobService, - ref reference.Named, -) (godigest.Digest, error) { - var options []distribution.ManifestServiceOption - if len(tag) > 0 { - glog.V(5).Infof("Put manifest %s:%s", ref, tag) - options = []distribution.ManifestServiceOption{distribution.WithTag(tag)} - } else { - glog.V(5).Infof("Put manifest %s", ref) - } - toDigest, err := toManifests.Put(ctx, srcManifest, options...) - if err == nil { - return toDigest, nil - } - errs, ok := err.(errcode.Errors) - if !ok || len(errs) == 0 { - return toDigest, err - } - errcode, ok := errs[0].(errcode.Error) - if !ok || errcode.ErrorCode() != v2.ErrorCodeManifestInvalid { - return toDigest, err - } - // try downconverting to v2-schema1 - schema2Manifest, ok := srcManifest.(*schema2.DeserializedManifest) - if !ok { - return toDigest, err - } - tagRef, tagErr := reference.WithTag(ref, tag) - if tagErr != nil { - return toDigest, err - } - glog.V(5).Infof("Registry reported invalid manifest error, attempting to convert to v2schema1 as ref %s", tagRef) - schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, tagRef) - if convertErr != nil { - return toDigest, err - } - if glog.V(6) { - _, data, _ := schema1Manifest.Payload() - glog.Infof("Converted to v2schema1\n%s", string(data)) - } - return toManifests.Put(ctx, schema1Manifest, distribution.WithTag(tag)) -} - -// TDOO: remove when quay.io switches to v2 schema -func convertToSchema1(ctx apirequest.Context, blobs distribution.BlobService, schema2Manifest *schema2.DeserializedManifest, ref reference.Named) (distribution.Manifest, error) { - targetDescriptor := schema2Manifest.Target() - configJSON, err := blobs.Get(ctx, targetDescriptor.Digest) - if err != nil { - return nil, err - } - trustKey, err := loadPrivateKey() - if err != nil { - return nil, err - } - builder := schema1.NewConfigManifestBuilder(blobs, trustKey, ref, configJSON) - for _, d := range schema2Manifest.Layers { - if err := builder.AppendReference(d); err != nil { - return nil, err - } - } - manifest, err := builder.Build(ctx) - if err != nil { - return nil, err - } - return manifest, nil -} - -var ( - privateKeyLock sync.Mutex - privateKey libtrust.PrivateKey -) - -// TDOO: remove when quay.io switches to v2 schema -func loadPrivateKey() (libtrust.PrivateKey, error) { - privateKeyLock.Lock() - defer privateKeyLock.Unlock() - if privateKey != nil { - return privateKey, nil - } - trustKey, err := libtrust.GenerateECP256PrivateKey() - if err != nil { - return nil, err - } - privateKey = trustKey - return privateKey, nil -} - type optionFunc func(interface{}) error func (f optionFunc) Apply(v interface{}) error { diff --git a/pkg/oc/cli/cmd/image/mirror/workqueue.go b/pkg/oc/cli/cmd/image/mirror/workqueue.go new file mode 100644 index 000000000000..6587bff17816 --- /dev/null +++ b/pkg/oc/cli/cmd/image/mirror/workqueue.go @@ -0,0 +1,131 @@ +package mirror + +import ( + "sync" + + "github.com/golang/glog" +) + +type workQueue struct { + ch chan workUnit + wg *sync.WaitGroup +} + +func newWorkQueue(workers int, stopCh <-chan struct{}) *workQueue { + q := &workQueue{ + ch: make(chan workUnit, 100), + wg: &sync.WaitGroup{}, + } + go q.run(workers, stopCh) + return q +} + +func (q *workQueue) run(workers int, stopCh <-chan struct{}) { + for i := 0; i < workers; i++ { + go func(i int) { + defer glog.V(4).Infof("worker %d stopping", i) + for { + select { + case work, ok := <-q.ch: + if !ok { + return + } + work.fn() + work.wg.Done() + case <-stopCh: + return + } + } + }(i) + } + <-stopCh +} + +func (q *workQueue) Batch(fn func(Work)) { + w := &worker{ + wg: &sync.WaitGroup{}, + ch: q.ch, + } + fn(w) + w.wg.Wait() +} + +func (q *workQueue) Try(fn func(Try)) error { + w := &worker{ + wg: &sync.WaitGroup{}, + ch: q.ch, + err: make(chan error), + } + fn(w) + return w.FirstError() +} + +func (q *workQueue) Queue(fn func(Work)) { + w := &worker{ + wg: q.wg, + ch: q.ch, + } + fn(w) +} + +func (q *workQueue) Done() { + q.wg.Wait() +} + +type workUnit struct { + fn func() + wg *sync.WaitGroup +} + +type Work interface { + Parallel(fn func()) +} + +type Try interface { + Try(fn func() error) +} + +type worker struct { + wg *sync.WaitGroup + ch chan workUnit + err chan error +} + +func (w *worker) FirstError() error { + done := make(chan struct{}) + go func() { + w.wg.Wait() + close(done) + }() + for { + select { + case err := <-w.err: + if err != nil { + return err + } + case <-done: + return nil + } + } +} + +func (w *worker) Parallel(fn func()) { + w.wg.Add(1) + w.ch <- workUnit{wg: w.wg, fn: fn} +} + +func (w *worker) Try(fn func() error) { + w.wg.Add(1) + w.ch <- workUnit{ + wg: w.wg, + fn: func() { + err := fn() + if w.err == nil { + // TODO: have the work queue accumulate errors and release them with Done() + glog.Errorf("Worker error: %v", err) + return + } + w.err <- err + }, + } +}