Skip to content

Commit

Permalink
Merge pull request #68 from nokia/multi-ns
Browse files Browse the repository at this point in the history
Fix caching in case multiple Repository resources point to the same git repo
  • Loading branch information
nephio-prow[bot] authored Jun 13, 2024
2 parents 9fd76cc + ebbc669 commit adfa8d0
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 114 deletions.
107 changes: 57 additions & 50 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,46 @@ func NewCache(cacheDir string, repoSyncFrequency time.Duration, useGitCaBundle b
}
}

func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (*cachedRepository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
defer span.End()

func getCacheKey(repositorySpec *configapi.Repository) (string, error) {
switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if ociSpec == nil {
return nil, fmt.Errorf("oci not configured")
return "", fmt.Errorf("oci not configured")
}
key := "oci://" + ociSpec.Registry
c.mutex.Lock()
defer c.mutex.Unlock()
return "oci://" + ociSpec.Registry, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if gitSpec == nil {
return "", errors.New("git property is required")
}
if gitSpec.Repo == "" {
return "", errors.New("git.repo property is required")
}
return fmt.Sprintf("git://%s/%s@%s/%s", gitSpec.Repo, gitSpec.Directory, repositorySpec.Namespace, repositorySpec.Name), nil

default:
return "", fmt.Errorf("repository type %q not supported", repositoryType)
}
}

cr := c.repositories[key]
func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (*cachedRepository, error) {
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
defer span.End()

if cr == nil {
key, err := getCacheKey(repositorySpec)
if err != nil {
return nil, err
}
c.mutex.Lock()
defer c.mutex.Unlock()
cachedRepo := c.repositories[key]

switch repositoryType := repositorySpec.Spec.Type; repositoryType {
case configapi.RepositoryTypeOCI:
ociSpec := repositorySpec.Spec.Oci
if cachedRepo == nil {
cacheDir := filepath.Join(c.cacheDir, "oci")
storage, err := kptoci.NewStorage(cacheDir)
if err != nil {
Expand All @@ -105,29 +128,17 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
if err != nil {
return nil, err
}
cr = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cr
cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo
}
return cr, nil
return cachedRepo, nil

case configapi.RepositoryTypeGit:
gitSpec := repositorySpec.Spec.Git
if gitSpec == nil {
return nil, errors.New("git property is required")
}
if gitSpec.Repo == "" {
return nil, errors.New("git.repo property is required")
}
if !isPackageContent(repositorySpec.Spec.Content) {
return nil, fmt.Errorf("git repository supports Package content only; got %q", string(repositorySpec.Spec.Content))
}
key := "git://" + gitSpec.Repo + gitSpec.Directory

c.mutex.Lock()
defer c.mutex.Unlock()

cr := c.repositories[key]
if cr == nil {
if cachedRepo == nil {
var mbs git.MainBranchStrategy
if gitSpec.CreateBranch {
mbs = git.CreateIfMissing
Expand All @@ -142,16 +153,16 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
}); err != nil {
return nil, err
} else {
cr = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cr
cachedRepo = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore, c.repoSyncFrequency)
c.repositories[key] = cachedRepo
}
} else {
// If there is an error from the background refresh goroutine, return it.
if err := cr.getRefreshError(); err != nil {
if err := cachedRepo.getRefreshError(); err != nil {
return nil, err
}
}
return cr, nil
return cachedRepo, nil

default:
return nil, fmt.Errorf("type %q not supported", repositoryType)
Expand All @@ -162,31 +173,27 @@ func isPackageContent(content configapi.RepositoryContent) bool {
return content == configapi.RepositoryContentPackage
}

func (c *Cache) CloseRepository(repositorySpec *configapi.Repository) error {
var key string
func (c *Cache) CloseRepository(repositorySpec *configapi.Repository, allRepos []configapi.Repository) error {
key, err := getCacheKey(repositorySpec)
if err != nil {
return err
}

switch repositorySpec.Spec.Type {
case configapi.RepositoryTypeOCI:
oci := repositorySpec.Spec.Oci
if oci == nil {
return fmt.Errorf("oci not configured for %s:%s", repositorySpec.ObjectMeta.Namespace, repositorySpec.ObjectMeta.Name)
// check if repositorySpec shares the underlying cached repo with another repository
for _, r := range allRepos {
if r.Name == repositorySpec.Name && r.Namespace == repositorySpec.Namespace {
continue
}
key = "oci://" + oci.Registry

case configapi.RepositoryTypeGit:
git := repositorySpec.Spec.Git
if git == nil {
return fmt.Errorf("git not configured for %s:%s", repositorySpec.ObjectMeta.Namespace, repositorySpec.ObjectMeta.Name)
otherKey, err := getCacheKey(&r)
if err != nil {
return err
}
if otherKey == key {
// do not close cached repo if it is shared
return nil
}
key = "git://" + git.Repo + git.Directory

default:
return fmt.Errorf("unknown repository type: %q", repositorySpec.Spec.Type)
}

// TODO: Multiple Repository resources can point to the same underlying repository
// and therefore the same cache. Implement reference counting

var repository *cachedRepository
{
c.mutex.Lock()
Expand Down
39 changes: 24 additions & 15 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

gogit "github.com/go-git/go-git/v5"
"github.com/google/go-cmp/cmp"
api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/api/porchconfig/v1alpha1"
Expand All @@ -39,7 +38,7 @@ func TestLatestPackages(t *testing.T) {
ctx := context.Background()
testPath := filepath.Join("..", "git", "testdata")

_, cachedGit := openRepositoryFromArchive(t, ctx, testPath, "nested")
cachedRepo := openRepositoryFromArchive(t, ctx, testPath, "nested")

wantLatest := map[string]string{
"sample": "v2",
Expand All @@ -48,7 +47,7 @@ func TestLatestPackages(t *testing.T) {
"catalog/namespace/basens": "v3",
"catalog/namespace/istions": "v3",
}
revisions, err := cachedGit.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{})
revisions, err := cachedRepo.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{})
if err != nil {
t.Fatalf("ListPackageRevisions failed: %v", err)
}
Expand Down Expand Up @@ -84,9 +83,9 @@ func TestLatestPackages(t *testing.T) {
func TestPublishedLatest(t *testing.T) {
ctx := context.Background()
testPath := filepath.Join("..", "git", "testdata")
_, cached := openRepositoryFromArchive(t, ctx, testPath, "nested")
cachedRepo := openRepositoryFromArchive(t, ctx, testPath, "nested")

revisions, err := cached.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{
revisions, err := cachedRepo.ListPackageRevisions(ctx, repository.ListPackageRevisionFilter{
Package: "catalog/gcp/bucket",
WorkspaceName: "v2",
})
Expand All @@ -105,7 +104,7 @@ func TestPublishedLatest(t *testing.T) {
t.Fatalf("Bucket package lifecycle: got %s, want %s", got, want)
}

update, err := cached.UpdatePackageRevision(ctx, bucket)
update, err := cachedRepo.UpdatePackageRevision(ctx, bucket)
if err != nil {
t.Fatalf("UpdatePackaeg(%s) failed: %v", bucket.Key(), err)
}
Expand All @@ -127,20 +126,20 @@ func TestPublishedLatest(t *testing.T) {
}
}

func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name string) (*gogit.Repository, *cachedRepository) {
func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name string) *cachedRepository {
t.Helper()

tempdir := t.TempDir()
tarfile := filepath.Join(testPath, fmt.Sprintf("%s-repository.tar", name))
repo, address := git.ServeGitRepository(t, tarfile, tempdir)
metadataStore := createMetadataStoreFromArchive(t, "", "")
_, address := git.ServeGitRepository(t, tarfile, tempdir)
metadataStore := createMetadataStoreFromArchive(t, fmt.Sprintf("%s-metadata.yaml", name), name)

cache := NewCache(t.TempDir(), 60*time.Second, true, CacheOptions{
MetadataStore: metadataStore,
ObjectNotifier: &fakecache.ObjectNotifier{},
MetadataStore: metadataStore,
ObjectNotifier: &fakecache.ObjectNotifier{},
CredentialResolver: &fakecache.CredentialResolver{},
})
cachedGit, err := cache.OpenRepository(ctx, &v1alpha1.Repository{
apiRepo := &v1alpha1.Repository{
TypeMeta: metav1.TypeMeta{
Kind: v1alpha1.TypeRepository.Kind,
APIVersion: v1alpha1.TypeRepository.APIVersion(),
Expand All @@ -157,18 +156,28 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name
Repo: address,
},
},
})
}
cachedRepo, err := cache.OpenRepository(ctx, apiRepo)
if err != nil {
t.Fatalf("OpenRepository(%q) of %q failed; %v", address, tarfile, err)
}

return repo, cachedGit
t.Cleanup(func() {
err := cache.CloseRepository(apiRepo, []v1alpha1.Repository{*apiRepo})
if err != nil {
t.Errorf("CloseRepository(%q) failed: %v", address, err)
}
if len(cache.repositories) != 0 {
t.Errorf("CloseRepository hasn't deleted repository from cache")
}
})
return cachedRepo
}

func createMetadataStoreFromArchive(t *testing.T, testPath, name string) meta.MetadataStore {
t.Helper()

f := filepath.Join("..", "git", "testdata", "nested-metadata.yaml")
f := filepath.Join("..", "git", "testdata", testPath)
c, err := os.ReadFile(f)
if err != nil && !os.IsNotExist(err) {
t.Fatalf("Error reading metadata file found for repository %s", name)
Expand Down
41 changes: 3 additions & 38 deletions pkg/registry/porch/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,52 +138,17 @@ func (b *background) updateCache(ctx context.Context, event watch.EventType, rep
// TODO: implement
case watch.Deleted:
klog.Infof("Repository deleted: %s:%s", repository.ObjectMeta.Namespace, repository.ObjectMeta.Name)
shared, err := b.isSharedRepository(ctx, repository)
if err != nil {
var repoList configapi.RepositoryList
if err := b.coreClient.List(ctx, &repoList); err != nil {
return err
}
// Only close the repository if no other k8s repository resources references
// the same underlying git/oci repo.
if !shared {
return b.cache.CloseRepository(repository)
}
return nil
return b.cache.CloseRepository(repository, repoList.Items)
default:
klog.Warningf("Unhandled watch event type: %s", event)
}
return nil
}

// isSharedRepository checks if the underlying git/oci repo of the provided
// k8s repository is also used by another repository.
func (b *background) isSharedRepository(ctx context.Context, repo *configapi.Repository) (bool, error) {
var obj configapi.RepositoryList
if err := b.coreClient.List(ctx, &obj); err != nil {
return false, err
}
for _, r := range obj.Items {
if r.Name == repo.Name && r.Namespace == repo.Namespace {
continue
}
if r.Spec.Type != repo.Spec.Type {
continue
}
switch r.Spec.Type {
case configapi.RepositoryTypeOCI:
if r.Spec.Oci.Registry == repo.Spec.Oci.Registry {
return true, nil
}
case configapi.RepositoryTypeGit:
if r.Spec.Git.Repo == repo.Spec.Git.Repo && r.Spec.Git.Directory == repo.Spec.Git.Directory {
return true, nil
}
default:
return false, fmt.Errorf("type %q not supported", r.Spec.Type)
}
}
return false, nil
}

func (b *background) runOnce(ctx context.Context) error {
klog.Infof("background-refreshing repositories")
var repositories configapi.RepositoryList
Expand Down
15 changes: 8 additions & 7 deletions pkg/registry/porch/packagecommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type packageCommon struct {
createStrategy SimpleRESTCreateStrategy
}

func (r *packageCommon) listPackageRevisions(ctx context.Context, filter packageRevisionFilter,
func (r *packageCommon) listPackageRevisions(ctx context.Context, filter packageRevisionFilter,
selector labels.Selector, callback func(p *engine.PackageRevision) error) error {
var opts []client.ListOption
if ns, namespaced := genericapirequest.NamespaceFrom(ctx); namespaced {
ns, namespaced := genericapirequest.NamespaceFrom(ctx)
if namespaced && ns != "" {
opts = append(opts, client.InNamespace(ns))

if filter.Namespace != "" && ns != filter.Namespace {
Expand Down Expand Up @@ -200,7 +201,7 @@ func (r *packageCommon) getPackage(ctx context.Context, name string) (*engine.Pa
}

// Common implementation of PackageRevision update logic.
func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo,
func (r *packageCommon) updatePackageRevision(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool) (runtime.Object, bool, error) {
// TODO: Is this all boilerplate??

Expand All @@ -222,8 +223,8 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string,
}
}

// We have to be runtime.Object (and not *api.PackageRevision) or else nil-checks fail (because a nil object is not a nil interface)
var oldApiPkgRev runtime.Object
// We have to be runtime.Object (and not *api.PackageRevision) or else nil-checks fail (because a nil object is not a nil interface)
var oldApiPkgRev runtime.Object
if !isCreate {
oldApiPkgRev, err = oldRepoPkgRev.GetPackageRevision(ctx)
if err != nil {
Expand Down Expand Up @@ -315,7 +316,7 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string,
}

// Common implementation of Package update logic.
func (r *packageCommon) updatePackage(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo,
func (r *packageCommon) updatePackage(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo,
createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool) (runtime.Object, bool, error) {
// TODO: Is this all boilerplate??

Expand All @@ -338,7 +339,7 @@ func (r *packageCommon) updatePackage(ctx context.Context, name string, objInfo
}

// We have to be runtime.Object (and not *api.PackageRevision) or else nil-checks fail (because a nil object is not a nil interface)
var oldRuntimeObj runtime.Object
var oldRuntimeObj runtime.Object
if !isCreate {
oldRuntimeObj = oldPackage.GetPackage()
}
Expand Down
8 changes: 4 additions & 4 deletions scripts/create-deployment-blueprint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ function main() {
customize-container-env

customize-image \
"docker.io/porch-function-runner:latest" \
"docker.io/nephio/porch-function-runner:latest" \
"${FUNCTION_IMAGE}"
customize-image \
"docker.io/porch-server:latest" \
"docker.io/nephio/porch-server:latest" \
"${SERVER_IMAGE}"
customize-image \
"docker.io/porch-controllers:latest" \
"docker.io/nephio/porch-controllers:latest" \
"${CONTROLLERS_IMAGE}"
customize-image-in-env \
"docker.io/porch-wrapper-server:latest" \
"docker.io/nephio/porch-wrapper-server:latest" \
"${WRAPPER_SERVER_IMAGE}"
}

Expand Down
Loading

0 comments on commit adfa8d0

Please sign in to comment.