Skip to content

Commit

Permalink
Provide layers referenced by an image stream as layers subresource
Browse files Browse the repository at this point in the history
Adds a new GET endpoint to an image stream as a subresource `layers`
that returns an array of every layer referenced by the image stream and
the tags and images included by the image.

The subresource is fed by a store driven informer that caches and
indexes only the layers. Clients get a 500 retry error if the cache has
not initialized yet (the client will silently retry).

Turns the registry access check for a given layer into an O(1) check
instead of O(N) where N is the number of images in the image stream.
  • Loading branch information
smarterclayton committed Jul 11, 2018
1 parent 81e6e1c commit aac71d5
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 105 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/server/origin/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func LegacyStorage(storage map[schema.GroupVersion]map[string]rest.Storage) map[

case *imagestreametcd.REST:
legacyStorage[resource] = &imagestreametcd.LegacyREST{REST: storage}
case *imagestreametcd.LayersREST:
delete(legacyStorage, resource)

case *routeetcd.REST:
store := *storage.Store
Expand Down
16 changes: 15 additions & 1 deletion pkg/image/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ExtraConfig struct {
makeV1Storage sync.Once
v1Storage map[string]rest.Storage
v1StorageErr error
startFns []func(<-chan struct{})
}

type ImageAPIServerConfig struct {
Expand Down Expand Up @@ -109,6 +110,15 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}

if err := s.GenericAPIServer.AddPostStartHook("image.openshift.io-apiserver-caches", func(context genericapiserver.PostStartHookContext) error {
for _, fn := range c.ExtraConfig.startFns {
go fn(context.StopCh)
}
return nil
}); err != nil {
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -193,10 +203,13 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
whitelister = whitelist.WhitelistAllRegistries()
}

imageLayerIndex := imagestreametcd.NewImageLayerIndex(imageV1Client.Image().Images())
c.ExtraConfig.startFns = append(c.ExtraConfig.startFns, imageLayerIndex.Run)

imageRegistry := image.NewRegistry(imageStorage)
imageSignatureStorage := imagesignature.NewREST(imageClient.Image())
imageStreamSecretsStorage := imagesecret.NewREST(coreClient)
imageStreamStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister)
imageStreamStorage, imageStreamLayersStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister, imageLayerIndex)
if err != nil {
return nil, fmt.Errorf("error building REST storage: %v", err)
}
Expand Down Expand Up @@ -231,6 +244,7 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
v1Storage["imagesignatures"] = imageSignatureStorage
v1Storage["imageStreams/secrets"] = imageStreamSecretsStorage
v1Storage["imageStreams"] = imageStreamStorage
v1Storage["imageStreams/layers"] = imageStreamLayersStorage
v1Storage["imageStreams/status"] = imageStreamStatusStorage
v1Storage["imageStreamImports"] = imageStreamImportStorage
v1Storage["imageStreamImages"] = imageStreamImageStorage
Expand Down
76 changes: 73 additions & 3 deletions pkg/image/registry/imagestream/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package etcd
import (
"context"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
Expand Down Expand Up @@ -47,7 +48,8 @@ func NewREST(
subjectAccessReviewRegistry authorizationclient.SubjectAccessReviewInterface,
limitVerifier imageadmission.LimitVerifier,
registryWhitelister whitelist.RegistryWhitelister,
) (*REST, *StatusREST, *InternalREST, error) {
imageLayerIndex ImageLayerIndex,
) (*REST, *LayersREST, *StatusREST, *InternalREST, error) {
store := registry.Store{
NewFunc: func() runtime.Object { return &imageapi.ImageStream{} },
NewListFunc: func() runtime.Object { return &imageapi.ImageStreamList{} },
Expand All @@ -72,9 +74,11 @@ func NewREST(
AttrFunc: storage.AttrFunc(storage.DefaultNamespaceScopedAttr).WithFieldMutation(imageapi.ImageStreamSelector),
}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

layersREST := &LayersREST{index: imageLayerIndex, store: &store}

statusStrategy := imagestream.NewStatusStrategy(strategy)
statusStore := store
statusStore.Decorator = nil
Expand All @@ -89,7 +93,7 @@ func NewREST(
internalStore.UpdateStrategy = internalStrategy

internalREST := &InternalREST{store: &internalStore}
return rest, statusREST, internalREST, nil
return rest, layersREST, statusREST, internalREST, nil
}

// StatusREST implements the REST endpoint for changing the status of an image stream.
Expand Down Expand Up @@ -139,6 +143,72 @@ func (r *InternalREST) Update(ctx context.Context, name string, objInfo rest.Upd
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation)
}

// LayersREST implements the REST endpoint for changing both the spec and status of an image stream.
type LayersREST struct {
store *registry.Store
index ImageLayerIndex
}

var _ rest.Getter = &LayersREST{}

func (r *LayersREST) New() runtime.Object {
return &imageapi.ImageStreamLayers{}
}

// Get returns the layers for an image stream.
func (r *LayersREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
if !r.index.HasSynced() {
return nil, errors.NewServerTimeout(r.store.DefaultQualifiedResource, "get", 2)
}
obj, err := r.store.Get(ctx, name, options)
if err != nil {
return nil, err
}
is := obj.(*imageapi.ImageStream)
isl := &imageapi.ImageStreamLayers{
ObjectMeta: is.ObjectMeta,
Blobs: make(map[string]imageapi.ImageLayerData),
Images: make(map[string]imageapi.ImageBlobReferences),
}

for _, status := range is.Status.Tags {
for _, item := range status.Items {
if len(item.Image) == 0 {
continue
}

obj, _, _ := r.index.GetByKey(item.Image)
entry, ok := obj.(*ImageLayers)
if !ok {
continue
}

if _, ok := isl.Images[item.Image]; !ok {
var reference imageapi.ImageBlobReferences
for _, layer := range entry.Layers {
reference.Layers = append(reference.Layers, layer.Name)
if _, ok := isl.Blobs[layer.Name]; !ok {
isl.Blobs[layer.Name] = imageapi.ImageLayerData{LayerSize: &layer.LayerSize, MediaType: layer.MediaType}
}
}
if blob := entry.Manifest; blob != nil {
reference.Manifest = &blob.Name
if _, ok := isl.Blobs[blob.Name]; !ok {
if blob.LayerSize == 0 {
// only send media type since we don't the size of the manifest
isl.Blobs[blob.Name] = imageapi.ImageLayerData{MediaType: blob.MediaType}
} else {
isl.Blobs[blob.Name] = imageapi.ImageLayerData{LayerSize: &blob.LayerSize, MediaType: blob.MediaType}
}
}
}
isl.Images[item.Image] = reference
}
}
}
return isl, nil
}

// LegacyREST allows us to wrap and alter some behavior
type LegacyREST struct {
*REST
Expand Down
6 changes: 4 additions & 2 deletions pkg/image/registry/imagestream/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ func newStorage(t *testing.T) (*REST, *StatusREST, *InternalREST, *etcdtesting.E
server, etcdStorage := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
etcdStorage.Codec = legacyscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: "image.openshift.io", Version: "v1"})
registry := imageapi.DefaultRegistryHostnameRetriever(noDefaultRegistry, "", "")
imageStorage, statusStorage, internalStorage, err := NewREST(
imageStorage, _, statusStorage, internalStorage, err := NewREST(
restoptions.NewSimpleGetter(etcdStorage),
registry,
&fakeSubjectAccessReviewRegistry{},
&admfake.ImageStreamLimitVerifier{},
&fake.RegistryWhitelister{})
&fake.RegistryWhitelister{},
NewEmptyLayerIndex(),
)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit aac71d5

Please sign in to comment.