Skip to content

Commit

Permalink
Fixes for Optimize OCI repository lister lookups in flux plugin #5523
Browse files Browse the repository at this point in the history
…and Unable to delete Flux package in "pending" state #5538  (#5549)

performance optimizations in flux plugin for OCI HelmRepositories and a
fix to mark the release failed instead of pending
  • Loading branch information
gfichtenholt authored Oct 24, 2022
1 parent fe00fbb commit c4466ce
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/kubeapps-apis/core/packages/v1alpha1/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s packagesServer) GetInstalledPackageDetail(ctx context.Context, request *

// GetAvailablePackageVersions returns the package versions based on the request.
func (s packagesServer) GetAvailablePackageVersions(ctx context.Context, request *packages.GetAvailablePackageVersionsRequest) (*packages.GetAvailablePackageVersionsResponse, error) {
log.InfoS("+core GetAvailablePackageVersions %s", "cluster", request.GetAvailablePackageRef().GetContext().GetCluster(), "namespace", request.GetAvailablePackageRef().GetContext().GetNamespace())
log.InfoS("+core GetAvailablePackageVersions", "cluster", request.GetAvailablePackageRef().GetContext().GetCluster(), "namespace", request.GetAvailablePackageRef().GetContext().GetNamespace())

if request.GetAvailablePackageRef().GetPlugin() == nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to retrieve the plugin (missing AvailablePackageRef.Plugin)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type RateLimitingInterface interface {
Name() string
ExpectAdd(item string)
IsProcessing(item string) bool
AddIfNotProcessing(item string)
WaitUntilForgotten(item string)
Reset()
}
Expand Down Expand Up @@ -79,6 +80,10 @@ func (q *rateLimitingType) IsProcessing(item string) bool {
return q.queue.isProcessing(item)
}

func (q *rateLimitingType) AddIfNotProcessing(item string) {
q.queue.addIfNotProcessing(item)
}

func (q *rateLimitingType) Reset() {
log.Infof("+Reset(), [%s], delayingInterface queue size: [%d]",
q.Name(), q.DelayingInterface.Len())
Expand Down Expand Up @@ -138,6 +143,9 @@ type Type struct {
queue []string

// expected defines all of the items that are expected to be processed.
// the whole reason behind having it in the queue is to avoid a race condition
// in unit tests, where an item is added to the queue and then the test code
// needs to wait until its been processed before taking further action
// Used in unit tests only
expected sets.String

Expand Down Expand Up @@ -317,6 +325,30 @@ func (q *Type) isProcessing(item string) bool {
return q.processing.Has(item)
}

// Atomic check if not already processing then Add.
func (q *Type) addIfNotProcessing(itemstr string) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.shuttingDown {
return
}

if !q.processing.Has(itemstr) {
q.expected.Delete(itemstr)
if q.dirty.Has(itemstr) {
return
}

q.dirty.Insert(itemstr)
q.queue = append(q.queue, itemstr)
if q.verbose {
log.Infof("[%s]: addIfNotProcessing(%s)%s", q.name, itemstr, q.prettyPrintAll())
}
q.cond.Broadcast()
}
}

// this func is the added feature that was missing in k8s workqueue
func (q *Type) waitUntilDone(item string) {
if q.verbose {
Expand Down Expand Up @@ -367,11 +399,11 @@ func printOneItemPerLine(strs []string) string {
return "[]"
} else {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("[%d] [\n", len(strs)))
sb.WriteString(fmt.Sprintf("[%d] {\n", len(strs)))
for _, s := range strs {
sb.WriteString("\t\t" + s + "\n")
}
sb.WriteString("\t]")
sb.WriteString("\t}")
return sb.String()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ func (c *NamespacedResourceWatcherCache) computeValuesForKeys(keys sets.String)
// closed (and there are no more items)
for key := range requestChan {
// see Get() for explanation of what is happening below
c.forceKey(key)
c.forceKey(key, false)
}
wg.Done()
}()
Expand Down Expand Up @@ -888,7 +888,7 @@ func (c *NamespacedResourceWatcherCache) computeAndFetchValuesForKeys(keys sets.
// closed (and there are no more items)
for job := range requestChan {
// see Get() for explanation of what is happening below
value, err := c.ForceAndFetch(job.key)
value, err := c.ForceAndFetch(job.key, false)
responseChan <- computeValueJobResult{job, value, err}
}
wg.Done()
Expand Down Expand Up @@ -970,13 +970,29 @@ func (c *NamespacedResourceWatcherCache) Get(key string) (interface{}, error) {
return nil, err
} else if value == nil {
// cache miss
return c.ForceAndFetch(key)
return c.ForceAndFetch(key, false)
}
return value, nil
}

func (c *NamespacedResourceWatcherCache) forceKey(key string) {
c.queue.Add(key)
// force a particular key to be processed
func (c *NamespacedResourceWatcherCache) forceKey(key string, skipIfProcessing bool) {
if skipIfProcessing {
// There is one use case when the client needs to be able to do an Add(), regardless of whether
// the item is being processed, e.g. when the corresponding value goes through several quick changes.
// Then there is a separate use case when the client doesn't want to do an Add
// if the item is currently being processed, such as a .Get() operation that leads to a lengthy
// .Add() cuncurrently with another .Get() immediately. Executing two 2 .Add() operations does
// not solve any problems just slows the whole thing down.
// This is what the UX is currently doing when you select an OCI package to deploy:
// both GetAvailablePackageVersions() and GetAvailablePackageDetail()
// are called concurrently. This is really a performance optimization to make sure that .Add() is only
// executed once
c.queue.AddIfNotProcessing(key)
} else {
c.queue.Add(key)
}

// now need to wait until this item has been processed by runWorker().
// a little bit in-efficient: syncHandler() will eventually call config.onAdd()
// which encode the data as []byte before storing it in the cache. That part is fine.
Expand All @@ -986,8 +1002,8 @@ func (c *NamespacedResourceWatcherCache) forceKey(key string) {
c.queue.WaitUntilForgotten(key)
}

func (c *NamespacedResourceWatcherCache) ForceAndFetch(key string) (interface{}, error) {
c.forceKey(key)
func (c *NamespacedResourceWatcherCache) ForceAndFetch(key string, skipIfProcessing bool) (interface{}, error) {
c.forceKey(key, skipIfProcessing)
// TODO (gfichtenholt): if there was an error while processing the cache entry, such as
// E0903 09:07:17.660753 1 watcher_cache.go:595] Invocation of [onAdd] for object {
// ...
Expand All @@ -1004,7 +1020,13 @@ func (c *NamespacedResourceWatcherCache) ForceAndFetch(key string) (interface{},
return c.fetch(key)
}

// this func is used by unit tests only
// this func is used by unit tests only to make sure entries were processed by
// cache as expected and without race conditions. The general pattern for this is
// in the unit test code we do
// - cache.ExpectAdd(key)
// - perform some k8s flux HelmRepository CRD operation...
// - cache.WaitUntilForgotten(key)
// - at this point we can guarantee the cache entry has been (asynchronously) processed...
func (c *NamespacedResourceWatcherCache) ExpectAdd(key string) {
c.queue.ExpectAdd(key)
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av

var fn cache.DownloadChartFn
if chartModel.Repo.Type == "oci" {
if ociRepo, err := s.newOCIChartRepositoryAndLogin(ctx, repoName); err != nil {
if ociRepo, err := s.newOCIChartRepositoryAndLogin(ctx, *repo); err != nil {
return nil, err
} else {
fn = downloadOCIChartFn(ociRepo)
Expand All @@ -116,13 +116,11 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
if err != nil {
return nil, err
}
log.Infof("checkpoint 5")

pkgDetail, err := availablePackageDetailFromChartDetail(chartID, chartDetail)
if err != nil {
return nil, err
}
log.Infof("checkpoint 6")

// fix up a couple of fields that don't come from the chart tarball
repoUrl := repo.Spec.URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,10 +1115,7 @@ func TestGetOciAvailablePackageDetail(t *testing.T) {
t.Fatalf("%+v", err)
}

namespacedRepoName := types.NamespacedName{
Name: repoName,
Namespace: repoNamespace}
ociChartRepo, err := s.newOCIChartRepositoryAndLogin(context.Background(), namespacedRepoName)
ociChartRepo, err := s.newOCIChartRepositoryAndLogin(context.Background(), *repo)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"time"

log "k8s.io/klog/v2"

Expand Down Expand Up @@ -36,6 +37,10 @@ func NewDockerRegistryApiV2RepositoryLister() OCIChartRepositoryLister {
type dockerRegistryApiV2RepositoryLister struct {
}

func (l *dockerRegistryApiV2RepositoryLister) Name() string {
return "docker"
}

// ref https://github.com/distribution/distribution/blob/main/docs/spec/api.md#listing-repositories
// also https://github.com/oras-project/oras-go/blob/14422086e418/registry/remote/registry.go
func (l *dockerRegistryApiV2RepositoryLister) IsApplicableFor(ociRepo *OCIChartRepository) (bool, error) {
Expand All @@ -46,12 +51,14 @@ func (l *dockerRegistryApiV2RepositoryLister) IsApplicableFor(ociRepo *OCIChartR
return false, err
} else {
ping := "OK"
startTime := time.Now()
err = orasRegistry.Ping(context.Background())
if err != nil {
ping = fmt.Sprintf("%v", err)
}
log.Infof("ORAS v2 Registry [%s PlainHTTP=%t] PING: %s",
ociRepo.url.String(), orasRegistry.PlainHTTP, ping)
duration := time.Since(startTime)
log.Infof("ORAS v2 Registry [%s PlainHTTP=%t] PING: %s, elapsed time: [%d] ms",
ociRepo.url.String(), orasRegistry.PlainHTTP, ping, duration.Milliseconds())
if err != nil {
return false, err
}
Expand All @@ -62,7 +69,11 @@ func (l *dockerRegistryApiV2RepositoryLister) IsApplicableFor(ociRepo *OCIChartR
// GET "https://demo.goharbor.io/v2/_catalog?last=stefanprodan-podinfo-clone":
// unexpected status code 401: unauthorized: unauthorized to list catalog:
// unauthorized to list catalog
startTime = time.Now()
err = orasRegistry.Repositories(context.Background(), "", fn)
duration = time.Since(startTime)
log.Infof("ORAS v2 Registry [%s .Repositories()] elapsed time: [%d] ms",
ociRepo.url.String(), duration.Milliseconds())
if err == done {
// everything looks kosher
return true, nil
Expand All @@ -75,7 +86,9 @@ func (l *dockerRegistryApiV2RepositoryLister) IsApplicableFor(ociRepo *OCIChartR

// given an OCIChartRepository instance, returns a list of repository names, e.g.
// given an OCIChartRepository instance with url "oci://ghcr.io/stefanprodan/charts"
// may return ["stefanprodan/charts/podinfo", "stefanprodan/charts/podinfo-2"]
//
// may return ["stefanprodan/charts/podinfo", "stefanprodan/charts/podinfo-2"]
//
// ref: https://github.com/distribution/distribution/blob/main/docs/spec/api.md#listing-repositories
func (l *dockerRegistryApiV2RepositoryLister) ListRepositoryNames(ociRepo *OCIChartRepository) ([]string, error) {
log.Infof("+ListRepositoryNames(%s)", ociRepo.url.String())
Expand All @@ -95,19 +108,22 @@ func (l *dockerRegistryApiV2RepositoryLister) ListRepositoryNames(ociRepo *OCICh

fn := func(repos []string) error {
log.Infof("orasRegistry.Repositories fn: %s", repos)
lastRepoMatch := false
curRepoMatch := false
for _, r := range repos {
// Examples:
// GitHub and Harbor: stefanprodan-podinfo-clone/podinfo
// GCP Artifact Repository: vmware-kubeapps-ci/stefanprodan-podinfo-clone/podinfo
lastRepoMatch =
prevRepoMatch := curRepoMatch
if curRepoMatch =
strings.HasPrefix(r, startAt+"/") ||
strings.Contains(r, "/"+startAt+"/")
if lastRepoMatch {
strings.Contains(r, "/"+startAt+"/"); curRepoMatch {
repositoryList = append(repositoryList, r)
} else if prevRepoMatch {
// this means there was a match and now there isn't. early exit
break
}
}
if !lastRepoMatch {
if !curRepoMatch {
return done
} else {
return nil
Expand Down Expand Up @@ -138,12 +154,8 @@ func newRemoteOrasRegistry(ociRepo *OCIChartRepository) (*orasregistryremotev2.R
return nil, err
}
orasRegistry.Client = &orasregistryauthv2.Client{
Header: orasregistryauthv2.DefaultClient.Header.Clone(),
// not using the cache for now to avoid things like
// https://github.com/vmware-tanzu/kubeapps/issues/5219#issuecomment-1233738309
// also orasRegistry today is a short lived object, so caching tokens is a waste
// per ORAS code: If nil, no cache is used
Cache: nil,
Header: orasregistryauthv2.DefaultClient.Header.Clone(),
Cache: ociRepo.orasCache,
Credential: ociRepo.registryCredentialFn,
}
return orasRegistry, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,30 @@ var (
},
}

redis_summary_failed_2 = &corev1.InstalledPackageSummary{
InstalledPackageRef: my_redis_ref,
Name: "my-redis",
IconUrl: "https://bitnami.com/assets/stacks/redis/img/redis-stack-220x234.png",
PkgVersionReference: &corev1.VersionReference{
Version: "14.4.0",
},
CurrentVersion: &corev1.PackageAppVersion{
PkgVersion: "14.4.0",
AppVersion: "6.2.4",
},
PkgDisplayName: "redis",
ShortDescription: "Open source, advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.",
Status: &corev1.InstalledPackageStatus{
Ready: false,
Reason: corev1.InstalledPackageStatus_STATUS_REASON_FAILED,
UserReason: "GetLastReleaseFailed: failed to get last release revision",
},
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: "14.4.0",
AppVersion: "6.2.4",
},
}

redis_summary_pending = &corev1.InstalledPackageSummary{
InstalledPackageRef: my_redis_ref,
Name: "my-redis",
Expand Down Expand Up @@ -1662,6 +1686,33 @@ var (
},
}

redis_existing_spec_failed_2 = testSpecGetInstalledPackages{
repoName: "bitnami-1",
repoNamespace: "default",
repoIndex: testYaml("redis-many-versions.yaml"),
chartName: "redis",
chartTarGz: testTgz("redis-14.4.0.tgz"),
chartSpecVersion: "14.4.0",
chartArtifactVersion: "14.4.0",
releaseName: "my-redis",
releaseNamespace: "test",
releaseStatus: helmv2.HelmReleaseStatus{
Conditions: []metav1.Condition{
{
LastTransitionTime: metav1.Time{Time: lastTransitionTime},
Type: fluxmeta.ReadyCondition,
Status: metav1.ConditionFalse,
Reason: helmv2.GetLastReleaseFailedReason,
Message: "failed to get last release revision",
},
},
HelmChart: "default/redis",
Failures: 14,
InstallFailures: 1,
LastAttemptedRevision: "14.4.0",
},
}

redis_existing_stub_failed = helmReleaseStub{
name: "my-redis",
namespace: "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func NewHarborRegistryApiV2RepositoryLister() OCIChartRepositoryLister {
type harborRegistryApiV2RepositoryLister struct {
}

func (l *harborRegistryApiV2RepositoryLister) Name() string {
return "harbor"
}

func (l *harborRegistryApiV2RepositoryLister) IsApplicableFor(ociRepo *OCIChartRepository) (bool, error) {
log.Infof("+IsApplicableFor(%s)", ociRepo.url.String())
ref := strings.TrimPrefix(ociRepo.url.String(), fmt.Sprintf("%s://", registry.OCIScheme))
Expand Down
Loading

0 comments on commit c4466ce

Please sign in to comment.