-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update registry cache to (ab)use Data
field of Descriptor
objects
#34
Changes from all commits
7cf25c6
05d9b2e
073cf83
c2771e3
46e854a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,7 @@ func RegistryCache(r ociregistry.Interface) ociregistry.Interface { | |
registry: r, // TODO support "nil" here so this can be a poor-man's ocimem implementation? 👀 see also https://github.com/cue-labs/oci/issues/24 | ||
has: map[string]bool{}, | ||
tags: map[string]ociregistry.Digest{}, | ||
types: map[ociregistry.Digest]string{}, | ||
data: map[ociregistry.Digest][]byte{}, | ||
data: map[ociregistry.Digest]ociregistry.Descriptor{}, | ||
} | ||
} | ||
|
||
|
@@ -32,11 +31,10 @@ type registryCache struct { | |
registry ociregistry.Interface | ||
|
||
// https://github.com/cue-labs/oci/issues/24 | ||
mu sync.Mutex // TODO some kind of per-object/name/digest mutex so we don't request the same object from the upstream registry concurrently (on *top* of our maps mutex)? | ||
has map[string]bool // "repo/name@digest" => true (whether a given repo has the given digest) | ||
tags map[string]ociregistry.Digest // "repo/name:tag" => digest | ||
types map[ociregistry.Digest]string // digest => "mediaType" (most recent *storing* / "cache-miss" lookup wins, in the case of upstream/cross-repo ambiguity) | ||
data map[ociregistry.Digest][]byte // digest => data | ||
mu sync.Mutex // TODO some kind of per-object/name/digest mutex so we don't request the same object from the upstream registry concurrently (on *top* of our maps mutex)? | ||
has map[string]bool // "repo/name@digest" => true (whether a given repo has the given digest) | ||
tags map[string]ociregistry.Digest // "repo/name:tag" => digest | ||
data map[ociregistry.Digest]ociregistry.Descriptor // digest => mediaType+size(+data) (most recent *storing* / "cache-miss" lookup wins, in the case of upstream/cross-repo ambiguity) | ||
} | ||
|
||
func cacheKeyDigest(repo string, digest ociregistry.Digest) string { | ||
|
@@ -52,41 +50,38 @@ func (rc *registryCache) getBlob(ctx context.Context, repo string, digest ocireg | |
rc.mu.Lock() | ||
defer rc.mu.Unlock() | ||
|
||
if b, ok := rc.data[digest]; ok && rc.has[cacheKeyDigest(repo, digest)] { | ||
return ocimem.NewBytesReader(b, ociregistry.Descriptor{ | ||
MediaType: rc.types[digest], | ||
Digest: digest, | ||
Size: int64(len(b)), | ||
}), nil | ||
if desc, ok := rc.data[digest]; ok && desc.Data != nil && rc.has[cacheKeyDigest(repo, digest)] { | ||
return ocimem.NewBytesReader(desc.Data, desc), nil | ||
} | ||
|
||
r, err := f(ctx, repo, digest) | ||
if err != nil { | ||
return nil, err | ||
} | ||
//defer r.Close() | ||
// defer r.Close() happens later when we know we aren't making Close the caller's responsibility | ||
|
||
desc := r.Descriptor() | ||
digest = desc.Digest // if this isn't a no-op, we've got a naughty registry | ||
|
||
rc.has[cacheKeyDigest(repo, desc.Digest)] = true | ||
rc.types[desc.Digest] = desc.MediaType | ||
rc.has[cacheKeyDigest(repo, digest)] = true | ||
|
||
if desc.Size > manifestSizeLimit { | ||
rc.data[digest] = desc | ||
return r, nil | ||
} | ||
defer r.Close() | ||
|
||
b, err := io.ReadAll(r) | ||
desc.Data, err = io.ReadAll(r) | ||
if err != nil { | ||
r.Close() | ||
return nil, err | ||
} | ||
if err := r.Close(); err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(b) <= manifestSizeLimit { | ||
rc.data[desc.Digest] = b | ||
} else { | ||
delete(rc.data, desc.Digest) | ||
} | ||
rc.data[digest] = desc | ||
|
||
return ocimem.NewBytesReader(b, desc), nil | ||
return ocimem.NewBytesReader(desc.Data, desc), nil | ||
} | ||
|
||
func (rc *registryCache) GetBlob(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.BlobReader, error) { | ||
|
@@ -104,43 +99,108 @@ func (rc *registryCache) GetTag(ctx context.Context, repo string, tag string) (o | |
tagKey := cacheKeyTag(repo, tag) | ||
|
||
if digest, ok := rc.tags[tagKey]; ok { | ||
if b, ok := rc.data[digest]; ok { | ||
return ocimem.NewBytesReader(b, ociregistry.Descriptor{ | ||
MediaType: rc.types[digest], | ||
Digest: digest, | ||
Size: int64(len(b)), | ||
}), nil | ||
if desc, ok := rc.data[digest]; ok && desc.Data != nil { | ||
return ocimem.NewBytesReader(desc.Data, desc), nil | ||
} | ||
} | ||
|
||
r, err := rc.registry.GetTag(ctx, repo, tag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
//defer r.Close() | ||
// defer r.Close() happens later when we know we aren't making Close the caller's responsibility | ||
|
||
desc := r.Descriptor() | ||
|
||
rc.has[cacheKeyDigest(repo, desc.Digest)] = true | ||
rc.tags[tagKey] = desc.Digest | ||
rc.types[desc.Digest] = desc.MediaType | ||
|
||
b, err := io.ReadAll(r) | ||
if desc.Size > manifestSizeLimit { | ||
rc.data[desc.Digest] = desc | ||
return r, nil | ||
} | ||
defer r.Close() | ||
|
||
desc.Data, err = io.ReadAll(r) | ||
if err != nil { | ||
r.Close() | ||
return nil, err | ||
} | ||
if err := r.Close(); err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(b) <= manifestSizeLimit { | ||
rc.data[desc.Digest] = b | ||
} else { | ||
delete(rc.data, desc.Digest) | ||
rc.data[desc.Digest] = desc | ||
|
||
return ocimem.NewBytesReader(desc.Data, desc), nil | ||
} | ||
|
||
func (rc *registryCache) resolveBlob(ctx context.Context, repo string, digest ociregistry.Digest, f func(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.Descriptor, error)) (ociregistry.Descriptor, error) { | ||
rc.mu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have to lock on reads? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately it's unsafe for two goroutines to read a map otherwise (and because if this does end up querying the remote registry, it needs to write also). The "data is cached" implementation should be really fast, though, so there shouldn't be much lock contention here (the contention is going to come from actual "load-bearing" concurrent requests 😞). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seeing your answer, I think I have asked that before. Sorry for the noise. |
||
defer rc.mu.Unlock() | ||
|
||
if desc, ok := rc.data[digest]; ok && rc.has[cacheKeyDigest(repo, digest)] { | ||
return desc, nil | ||
} | ||
|
||
desc, err := f(ctx, repo, digest) | ||
if err != nil { | ||
return desc, err | ||
} | ||
|
||
digest = desc.Digest // if this isn't a no-op, we've got a naughty registry | ||
|
||
rc.has[cacheKeyDigest(repo, digest)] = true | ||
|
||
// carefully copy only valid Resolve* fields such that any other existing fields are kept (this matters more if we ever make our mutexes better/less aggressive 👀) | ||
if d, ok := rc.data[digest]; ok { | ||
d.MediaType = desc.MediaType | ||
d.Digest = desc.Digest | ||
d.Size = desc.Size | ||
desc = d | ||
} | ||
rc.data[digest] = desc | ||
|
||
return desc, nil | ||
} | ||
|
||
func (rc *registryCache) ResolveManifest(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) { | ||
return rc.resolveBlob(ctx, repo, digest, rc.registry.ResolveManifest) | ||
} | ||
|
||
func (rc *registryCache) ResolveBlob(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) { | ||
return rc.resolveBlob(ctx, repo, digest, rc.registry.ResolveBlob) | ||
} | ||
|
||
func (rc *registryCache) ResolveTag(ctx context.Context, repo string, tag string) (ociregistry.Descriptor, error) { | ||
rc.mu.Lock() | ||
defer rc.mu.Unlock() | ||
|
||
tagKey := cacheKeyTag(repo, tag) | ||
|
||
if digest, ok := rc.tags[tagKey]; ok { | ||
if desc, ok := rc.data[digest]; ok { | ||
return desc, nil | ||
} | ||
} | ||
|
||
desc, err := rc.registry.ResolveTag(ctx, repo, tag) | ||
if err != nil { | ||
return desc, err | ||
} | ||
|
||
rc.has[cacheKeyDigest(repo, desc.Digest)] = true | ||
rc.tags[tagKey] = desc.Digest | ||
|
||
// carefully copy only valid Resolve* fields such that any other existing fields are kept (this matters more if we ever make our mutexes better/less aggressive 👀) | ||
if d, ok := rc.data[desc.Digest]; ok { | ||
d.MediaType = desc.MediaType | ||
d.Digest = desc.Digest | ||
d.Size = desc.Size | ||
desc = d | ||
} | ||
rc.data[desc.Digest] = desc | ||
|
||
return ocimem.NewBytesReader(b, desc), nil | ||
return desc, nil | ||
} | ||
|
||
// TODO more methods (currently only implements what's actually necessary for SynthesizeIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if it is too big, we do not cache it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct -- that's in the previous implementation too, but handling it earlier turns out to be better because we can then return the original reader earlier instead of buffering it.
Also, "too big" here is currently 4MiB, so it's a pretty generous amount of things that will be cached in memory anyhow (especially since we're looking at mostly just indexes, image manifests, and config blobs).