Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ORAS v2 for listing repositories from ghcr.io #4947

Merged
merged 11 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions chart/kubeapps/templates/kubeappsapis/rbac_fluxv2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ rules:
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["get", "list"]
# Temp hack to avoid
# Failed to read secret for repo due to: rpc error: code = PermissionDenied desc = Forbidden
# to get the secret 'helm-podinfo' due to 'secrets "helm-podinfo" is forbidden:
# User "system:serviceaccount:kubeapps:kubeapps-internal-kubeappsapis" cannot get resource
# "secrets" in API group "" in the namespace "default"'
# see discussion in https://github.com/vmware-tanzu/kubeapps/pull/4932#issuecomment-1161243049
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
---
apiVersion: {{ include "common.capabilities.rbac.apiVersion" . }}
kind: ClusterRoleBinding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"encoding/gob"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/common"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/pkg/pkgutils"
"github.com/vmware-tanzu/kubeapps/pkg/chart/models"
httpclient "github.com/vmware-tanzu/kubeapps/pkg/http-client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -73,18 +71,20 @@ type ChartCache struct {
resyncCh chan int
}

type DownloadChartFn func(chartID, chartUrl, chartVersion string) ([]byte, error)

// chartCacheStoreEntry is what we'll be storing in the processing store
// note that url and delete fields are mutually exclusive, you must either:
// - set url to non-empty string or
// - deleted flag to true
// setting both for a given entry does not make sense
type chartCacheStoreEntry struct {
namespace string
id string
version string
url string
clientOptions *common.HttpClientOptions
deleted bool
namespace string
id string
version string
url string
downloadFn DownloadChartFn
deleted bool
}

func NewChartCache(name string, redisCli *redis.Client, stopCh <-chan struct{}) (*ChartCache, error) {
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewChartCache(name string, redisCli *redis.Client, stopCh <-chan struct{})

// this func will enqueue work items into chart work queue and return.
// the charts will be synced worker threads running in the background
func (c *ChartCache) SyncCharts(charts []models.Chart, clientOptions *common.HttpClientOptions) error {
func (c *ChartCache) SyncCharts(charts []models.Chart, downloadFn DownloadChartFn) error {
log.Infof("+SyncCharts()")
totalToSync := 0
defer func() {
Expand Down Expand Up @@ -163,12 +163,12 @@ func (c *ChartCache) SyncCharts(charts []models.Chart, clientOptions *common.Htt
}

entry := chartCacheStoreEntry{
namespace: chart.Repo.Namespace,
id: chart.ID,
version: chart.ChartVersions[0].Version,
url: u.String(),
clientOptions: clientOptions,
deleted: false,
namespace: chart.Repo.Namespace,
id: chart.ID,
version: chart.ChartVersions[0].Version,
url: u.String(),
downloadFn: downloadFn,
deleted: false,
}
if key, err := chartCacheKeyFunc(entry); err != nil {
log.Errorf("Failed to get key for chart due to %+v", err)
Expand Down Expand Up @@ -393,7 +393,7 @@ func (c *ChartCache) syncHandler(workerName, key string) error {
return nil
}
}
byteArray, err := ChartCacheComputeValue(chart.id, chart.url, chart.version, chart.clientOptions)
byteArray, err := ChartCacheComputeValue(chart.id, chart.url, chart.version, chart.downloadFn)
if err != nil {
return err
}
Expand Down Expand Up @@ -454,7 +454,7 @@ func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
• otherwise return the bytes stored in the
chart cache for the given entry
*/
func (c *ChartCache) GetForOne(key string, chart *models.Chart, clientOptions *common.HttpClientOptions) ([]byte, error) {
func (c *ChartCache) GetForOne(key string, chart *models.Chart, downloadFn DownloadChartFn) ([]byte, error) {
// TODO (gfichtenholt) it'd be nice to get rid of all arguments except for the key, similar to that of
// NamespacedResourceWatcherCache.GetForOne()
log.Infof("+GetForOne(%s)", key)
Expand All @@ -478,11 +478,11 @@ func (c *ChartCache) GetForOne(key string, chart *models.Chart, clientOptions *c
log.Warningf("chart: [%s], version: [%s] has no URLs", chart.ID, v.Version)
} else {
entry = &chartCacheStoreEntry{
namespace: namespace,
id: chartID,
version: v.Version,
url: v.URLs[0],
clientOptions: clientOptions,
namespace: namespace,
id: chartID,
version: v.Version,
url: v.URLs[0],
downloadFn: downloadFn,
}
}
break
Expand Down Expand Up @@ -599,21 +599,8 @@ func chartCacheKeyFor(namespace, chartID, chartVersion string) (string, error) {
}

// FYI: The work queue is able to retry transient HTTP errors
func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, clientOptions *common.HttpClientOptions) ([]byte, error) {
client, headers, err := common.NewHttpClientAndHeaders(clientOptions)
if err != nil {
return nil, err
}

reader, _, err := httpclient.GetStream(chartUrl, client, headers)
if reader != nil {
defer reader.Close()
}
if err != nil {
return nil, err
}

chartTgz, err := ioutil.ReadAll(reader)
func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, downloadFn DownloadChartFn) ([]byte, error) {
chartTgz, err := downloadFn(chartID, chartUrl, chartVersion)
if err != nil {
return nil, err
}
Expand Down
57 changes: 51 additions & 6 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"reflect"
"strings"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
corev1 "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/cache"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/common"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/pkg/pkgutils"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/pkg/statuserror"
"github.com/vmware-tanzu/kubeapps/pkg/chart/models"
httpclient "github.com/vmware-tanzu/kubeapps/pkg/http-client"
"github.com/vmware-tanzu/kubeapps/pkg/tarutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +40,7 @@ func (s *Server) getChartInCluster(ctx context.Context, key types.NamespacedName
return &chartObj, nil
}

// TODO (gfichtenholt) this func is too long. Break it up
func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.AvailablePackageReference, chartVersion string) (*corev1.AvailablePackageDetail, error) {
log.Infof("+availableChartDetail(%s, %s)", packageRef, chartVersion)

Expand Down Expand Up @@ -81,13 +85,30 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
chartVersion = chartModel.ChartVersions[0].Version
}

if key, err := s.chartCache.KeyFor(repoName.Namespace, chartID, chartVersion); err != nil {
return nil, err
} else if opts, err := s.clientOptionsForRepo(ctx, repoName); err != nil {
var key string
if key, err = s.chartCache.KeyFor(repoName.Namespace, chartID, chartVersion); err != nil {
return nil, err
} else if byteArray, err = s.chartCache.GetForOne(key, chartModel, opts); err != nil {
}

var fn cache.DownloadChartFn
if chartModel.Repo.Type == "oci" {
if ociRegistry, err := s.newOCIRegistryAndLoginWithRepo(ctx, repoName); err != nil {
return nil, err
} else {
fn = downloadOCIChartFn(ociRegistry)
}
} else {
if opts, err := s.httpClientOptionsForRepo(ctx, repoName); err != nil {
return nil, err
} else {
fn = downloadChartViaHttpFn(opts)
}
}
if byteArray, err = s.chartCache.GetForOne(key, chartModel, fn); err != nil {
return nil, err
} else if byteArray == nil {
}

if byteArray == nil {
return nil, status.Errorf(codes.Internal, "failed to load details for chart [%s]", chartModel.ID)
}
}
Expand Down Expand Up @@ -238,7 +259,7 @@ func availablePackageDetailFromChartDetail(chartID string, chartDetail map[strin
// TODO (gfichtenholt): if there is no chart yaml (is that even possible?),
// fall back to chart info from repo index.yaml
if !ok || chartYaml == "" {
return nil, status.Errorf(codes.Internal, "No chart manifest found for chart [%s]", chartID)
return nil, status.Errorf(codes.Internal, "No chart manifest found for chart: [%s]", chartID)
}
var chartMetadata chart.Metadata
err := yaml.Unmarshal([]byte(chartYaml), &chartMetadata)
Expand Down Expand Up @@ -286,3 +307,27 @@ func availablePackageDetailFromChartDetail(chartID string, chartDetail map[strin
// is not included in the tarball
return pkg, nil
}

func downloadChartViaHttpFn(options *common.HttpClientOptions) func(chartID, chartUrl, chartVersion string) ([]byte, error) {
return func(chartID, chartUrl, chartVersion string) ([]byte, error) {
client, headers, err := common.NewHttpClientAndHeaders(options)
if err != nil {
return nil, err
}

reader, _, err := httpclient.GetStream(chartUrl, client, headers)
if reader != nil {
defer reader.Close()
}
if err != nil {
return nil, err
}

chartTgz, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}

return chartTgz, nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"time"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
plugins "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/common"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -514,7 +517,7 @@ func TestKindClusterRepoAndChartRBAC(t *testing.T) {
}
}

func TestKindClusterGetAvailablePackageSummariesForOCI(t *testing.T) {
func TestKindClusterAvailablePackageEndpointsForOCI(t *testing.T) {
fluxPluginClient, fluxPluginReposClient, err := checkEnv(t)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -570,7 +573,7 @@ func TestKindClusterGetAvailablePackageSummariesForOCI(t *testing.T) {
t.Fatal(err)
}

grpcContext, cancel = context.WithTimeout(grpcContext, 90*time.Second)
grpcContext, cancel = context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()
resp, err := fluxPluginClient.GetAvailablePackageSummaries(
grpcContext,
Expand All @@ -579,5 +582,80 @@ func TestKindClusterGetAvailablePackageSummariesForOCI(t *testing.T) {
t.Fatalf("%v", err)
}

t.Logf("=======> %s", common.PrettyPrint(resp))
opt1 := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageSummariesResponse{},
corev1.AvailablePackageSummary{},
corev1.AvailablePackageReference{},
corev1.Context{},
plugins.Plugin{},
corev1.PackageAppVersion{})
opt2 := cmpopts.SortSlices(lessAvailablePackageFunc)
if got, want := resp, expected_oci_stefanprodan_podinfo_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
}

grpcContext, cancel = context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()
resp2, err := fluxPluginClient.GetAvailablePackageVersions(
grpcContext, &corev1.GetAvailablePackageVersionsRequest{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: &corev1.Context{
Namespace: "default",
},
Identifier: repoName.Name + "/podinfo",
},
})
if err != nil {
t.Fatal(err)
}
opts := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageVersionsResponse{},
corev1.PackageAppVersion{})
if got, want := resp2, expected_versions_stefanprodan_podinfo; !cmp.Equal(want, got, opts) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opts))
}

grpcContext, cancel = context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()
resp3, err := fluxPluginClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: &corev1.Context{
Namespace: "default",
},
Identifier: repoName.Name + "/podinfo",
},
})
if err != nil {
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
t,
resp3.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo(repoName.Name).AvailablePackageDetail)

// try a few older versions
grpcContext, cancel = context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()
resp4, err := fluxPluginClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: &corev1.Context{
Namespace: "default",
},
Identifier: repoName.Name + "/podinfo",
},
PkgVersion: "6.1.5",
})
if err != nil {
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
t,
resp4.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo_2(repoName.Name).AvailablePackageDetail)
}
Loading