Skip to content

Commit

Permalink
feat(kuma-cp): multitenancy adjustments
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed May 8, 2023
1 parent f2b74f4 commit 091f04f
Show file tree
Hide file tree
Showing 28 changed files with 150 additions and 141 deletions.
2 changes: 2 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ runtime:
defaults:
# If true, it skips creating the default Mesh
skipMeshCreation: false # ENV: KUMA_DEFAULTS_SKIP_MESH_CREATION
# If true, it skips creating the default tenant resources
skipTenantResources: false # ENV: KUMA_DEFAULTS_SKIP_TENANT_RESOURCES

# Metrics configuration
metrics:
Expand Down
2 changes: 2 additions & 0 deletions pkg/clusterid/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/core/user"
"github.com/kumahq/kuma/pkg/multitenant"
)

type clusterIDCreator struct {
Expand All @@ -27,6 +28,7 @@ func (c *clusterIDCreator) NeedLeaderElection() bool {

func (c *clusterIDCreator) create() error {
ctx := user.Ctx(context.Background(), user.ControlPlane)
ctx = multitenant.WithTenant(ctx, multitenant.GlobalTenantID)
resource := config_model.NewConfigResource()
err := c.configManager.Get(ctx, resource, store.GetByKey(config_manager.ClusterIdConfigKey, core_model.NoMesh))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/clusterid/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/store"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
"github.com/kumahq/kuma/pkg/core/user"
"github.com/kumahq/kuma/pkg/multitenant"
)

var log = core.Log.WithName("clusterID")
Expand All @@ -25,6 +26,7 @@ type clusterIDReader struct {

func (c *clusterIDReader) Start(stop <-chan struct{}) error {
ctx := user.Ctx(context.Background(), user.ControlPlane)
ctx = multitenant.WithTenant(ctx, multitenant.GlobalTenantID)
ticker := time.NewTicker(1 * time.Second)
for {
select {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var _ config.Config = &Defaults{}
type Defaults struct {
// If true, it skips creating the default Mesh
SkipMeshCreation bool `json:"skipMeshCreation" envconfig:"kuma_defaults_skip_mesh_creation"`
// If true, it skips creating the default tenant resources
SkipTenantResources bool `json:"skipTenantResources" envconfig:"kuma_defaults_skip_tenant_resources"`
}

func (d *Defaults) Sanitize() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ runtime:
defaults:
# If true, it skips creating the default Mesh
skipMeshCreation: false # ENV: KUMA_DEFAULTS_SKIP_MESH_CREATION
# If true, it skips creating the default tenant resources
skipTenantResources: false # ENV: KUMA_DEFAULTS_SKIP_TENANT_RESOURCES

# Metrics configuration
metrics:
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Multizone.Zone.KDS.NackBackoff.Duration).To(Equal(21 * time.Second))

Expect(cfg.Defaults.SkipMeshCreation).To(BeTrue())
Expect(cfg.Defaults.SkipTenantResources).To(BeTrue())

Expect(cfg.Diagnostics.ServerPort).To(Equal(uint32(5003)))
Expect(cfg.Diagnostics.DebugEndpoints).To(BeTrue())
Expand Down Expand Up @@ -538,6 +539,7 @@ dnsServer:
serviceVipPort: 9090
defaults:
skipMeshCreation: true
skipTenantResources: true
diagnostics:
serverPort: 5003
debugEndpoints: true
Expand Down Expand Up @@ -796,6 +798,7 @@ proxy:
"KUMA_EXPERIMENTAL_KDS_DELTA_ENABLED": "true",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_INSIGHT_FLUSH_INTERVAL": "5s",
"KUMA_DEFAULTS_SKIP_MESH_CREATION": "true",
"KUMA_DEFAULTS_SKIP_TENANT_RESOURCES": "true",
"KUMA_DIAGNOSTICS_SERVER_PORT": "5003",
"KUMA_DIAGNOSTICS_DEBUG_ENDPOINTS": "true",
"KUMA_DIAGNOSTICS_TLS_ENABLED": "true",
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func buildRuntime(appCtx context.Context, cfg kuma_cp.Config) (core_runtime.Runt
if err != nil {
return nil, err
}
builder.WithMultitenancy(multitenant.SingleTenant, multitenant.NoopHashingFn)
builder.WithMultitenancy(multitenant.SingleTenant)
builder.WithPgxConfigCustomizationFn(config.NoopPgxConfigCustomizationFn)
if err := initializeMetrics(builder); err != nil {
return nil, err
Expand Down Expand Up @@ -458,7 +458,7 @@ func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder
customizableManager,
builder.Config().Store.Cache.ExpirationTime.Duration,
builder.Metrics(),
builder.HashingFn().ResourceHashKey,
builder.Tenants(),
)
if err != nil {
return err
Expand Down
37 changes: 18 additions & 19 deletions pkg/core/resources/manager/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/multitenant"
)

// Cached version of the ReadOnlyResourceManager designed to be used only for use cases of eventual consistency.
Expand All @@ -24,14 +25,14 @@ type cachedManager struct {
cache *cache.Cache
metrics *prometheus.CounterVec

mutexes map[string]*sync.Mutex
mapMutex sync.Mutex // guards "mutexes" field
hashKeyFunc func(ctx context.Context) string
mutexes map[string]*sync.Mutex
mapMutex sync.Mutex // guards "mutexes" field
tenants multitenant.Tenants
}

var _ ReadOnlyResourceManager = &cachedManager{}

func NewCachedManager(delegate ReadOnlyResourceManager, expirationTime time.Duration, metrics metrics.Metrics, hashKeyFunc func(ctx context.Context) string) (ReadOnlyResourceManager, error) {
func NewCachedManager(delegate ReadOnlyResourceManager, expirationTime time.Duration, metrics metrics.Metrics, tenants multitenant.Tenants) (ReadOnlyResourceManager, error) {
metric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "store_cache",
Help: "Summary of Store Cache",
Expand All @@ -40,22 +41,21 @@ func NewCachedManager(delegate ReadOnlyResourceManager, expirationTime time.Dura
return nil, err
}
return &cachedManager{
delegate: delegate,
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
metrics: metric,
mutexes: map[string]*sync.Mutex{},
hashKeyFunc: hashKeyFunc,
delegate: delegate,
cache: cache.New(expirationTime, time.Duration(int64(float64(expirationTime)*0.9))),
metrics: metric,
mutexes: map[string]*sync.Mutex{},
tenants: tenants,
}, nil
}

func (c *cachedManager) Get(ctx context.Context, res model.Resource, fs ...store.GetOptionsFunc) error {
if c.hashKeyFunc != nil {
fs = append(fs, func(options *store.GetOptions) {
options.KeyFromContext = c.hashKeyFunc
})
tenantID, err := c.tenants.GetID(ctx)
if err != nil {
return err
}
opts := store.NewGetOptions(fs...)
cacheKey := fmt.Sprintf("GET:%s:%s", res.Descriptor().Name, opts.HashCode(ctx))
cacheKey := fmt.Sprintf("GET:%s:%s:%s", res.Descriptor().Name, opts.HashCode(), tenantID)
obj, found := c.cache.Get(cacheKey)
if !found {
// There might be a situation when cache just expired and there are many concurrent goroutines here.
Expand Down Expand Up @@ -91,16 +91,15 @@ func (c *cachedManager) Get(ctx context.Context, res model.Resource, fs ...store
}

func (c *cachedManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
if c.hashKeyFunc != nil {
fs = append(fs, func(options *store.ListOptions) {
options.KeyFromContext = c.hashKeyFunc
})
tenantID, err := c.tenants.GetID(ctx)
if err != nil {
return err
}
opts := store.NewListOptions(fs...)
if !opts.IsCacheable() {
return fmt.Errorf("filter functions are not allowed for cached store")
}
cacheKey := fmt.Sprintf("LIST:%s:%s", list.GetItemType(), opts.HashCode(ctx))
cacheKey := fmt.Sprintf("LIST:%s:%s:%s", list.GetItemType(), opts.HashCode(), tenantID)
obj, found := c.cache.Get(cacheKey)
if !found {
// There might be a situation when cache just expired and there are many concurrent goroutines here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/resources/manager/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Cached Resource Manager", func() {
m, err := core_metrics.NewMetrics("Standalone")
metrics = m
Expect(err).ToNot(HaveOccurred())
cachedManager, err = core_manager.NewCachedManager(countingManager, expiration, metrics, multitenant.NoopHashingFn.ResourceHashKey)
cachedManager, err = core_manager.NewCachedManager(countingManager, expiration, metrics, multitenant.SingleTenant)
Expect(err).ToNot(HaveOccurred())

// and created resources
Expand Down
30 changes: 13 additions & 17 deletions pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package store

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -115,10 +114,9 @@ func NewDeleteAllOptions(fs ...DeleteAllOptionsFunc) *DeleteAllOptions {
}

type GetOptions struct {
Name string
Mesh string
Version string
KeyFromContext HashKeyFunc
Name string
Mesh string
Version string
}

type GetOptionsFunc func(*GetOptions)
Expand Down Expand Up @@ -148,23 +146,21 @@ func GetByVersion(version string) GetOptionsFunc {
}
}

func (g *GetOptions) HashCode(ctx context.Context) string {
return fmt.Sprintf("%s:%s:%s", g.Name, g.Mesh, g.KeyFromContext(ctx))
func (g *GetOptions) HashCode() string {
return fmt.Sprintf("%s:%s", g.Name, g.Mesh)
}

type (
ListFilterFunc func(rs core_model.Resource) bool
HashKeyFunc func(context.Context) string
)

type ListOptions struct {
Mesh string
PageSize int
PageOffset string
FilterFunc ListFilterFunc
NameContains string
Ordered bool
KeyFromContext HashKeyFunc
Mesh string
PageSize int
PageOffset string
FilterFunc ListFilterFunc
NameContains string
Ordered bool
}

type ListOptionsFunc func(*ListOptions)
Expand Down Expand Up @@ -221,6 +217,6 @@ func (l *ListOptions) IsCacheable() bool {
return l.FilterFunc == nil
}

func (l *ListOptions) HashCode(ctx context.Context) string {
return fmt.Sprintf("%s:%t:%s:%d:%s:%s", l.Mesh, l.Ordered, l.NameContains, l.PageSize, l.PageOffset, l.KeyFromContext(ctx))
func (l *ListOptions) HashCode() string {
return fmt.Sprintf("%s:%t:%s:%d:%s", l.Mesh, l.Ordered, l.NameContains, l.PageSize, l.PageOffset)
}
11 changes: 11 additions & 0 deletions pkg/core/rest/errors/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kumahq/kuma/pkg/core/rest/errors/types"
"github.com/kumahq/kuma/pkg/core/tokens"
"github.com/kumahq/kuma/pkg/core/validators"
"github.com/kumahq/kuma/pkg/multitenant"
)

func HandleError(response *restful.Response, err error, title string) {
Expand Down Expand Up @@ -51,6 +52,8 @@ func HandleError(response *restful.Response, err error, title string) {
handleUnauthenticated(unauthenticated, title, response)
case err == tokens.IssuerDisabled:
handleIssuerDisabled(err, title, response)
case err == multitenant.TenantMissingErr:
handleTenantMissing(err, title, response)
default:
handleUnknownError(err, title, response)
}
Expand Down Expand Up @@ -183,6 +186,14 @@ func handleUnauthenticated(err *Unauthenticated, title string, response *restful
WriteError(response, 401, kumaErr)
}

func handleTenantMissing(err error, title string, response *restful.Response) {
kumaErr := types.Error{
Title: title,
Details: err.Error(),
}
WriteError(response, 400, kumaErr)
}

func WriteError(response *restful.Response, httpStatus int, kumaErr types.Error) {
if err := response.WriteHeaderAndJson(httpStatus, kumaErr, "application/json"); err != nil {
core.Log.Error(err, "Could not write the error response")
Expand Down
29 changes: 9 additions & 20 deletions pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ type BuilderContext interface {
TokenIssuers() builtin.TokenIssuers
MeshCache() *mesh.Cache
InterCPClientPool() *client.Pool
HashingFn() multitenant.Hashing
PgxConfigCustomizationFn() config.PgxConfigCustomization
TenantFn() multitenant.Tenant
Tenants() multitenant.Tenants
}

var _ BuilderContext = &Builder{}
Expand Down Expand Up @@ -98,8 +97,7 @@ type Builder struct {
interCpPool *client.Pool
*runtimeInfo
pgxConfigCustomizationFn config.PgxConfigCustomization
hashingFn multitenant.Hashing
tenantFn multitenant.Tenant
tenants multitenant.Tenants
}

func BuilderFor(appCtx context.Context, cfg kuma_cp.Config) (*Builder, error) {
Expand Down Expand Up @@ -265,9 +263,8 @@ func (b *Builder) WithInterCPClientPool(interCpPool *client.Pool) *Builder {
return b
}

func (b *Builder) WithMultitenancy(tenantFn multitenant.Tenant, hashingFn multitenant.Hashing) *Builder {
b.tenantFn = tenantFn
b.hashingFn = hashingFn
func (b *Builder) WithMultitenancy(tenants multitenant.Tenants) *Builder {
b.tenants = tenants
return b
}

Expand Down Expand Up @@ -343,14 +340,11 @@ func (b *Builder) Build() (Runtime, error) {
if b.interCpPool == nil {
return nil, errors.Errorf("InterCP client pool has not been configured")
}
if b.hashingFn == nil {
return nil, errors.Errorf("HashingFn has not been configured")
}
if b.pgxConfigCustomizationFn == nil {
return nil, errors.Errorf("PgxConfigCustomizationFn has not been configured")
}
if b.tenantFn == nil {
return nil, errors.Errorf("TenantFn has not been configured")
if b.tenants == nil {
return nil, errors.Errorf("Tenants has not been configured")
}
return &runtime{
RuntimeInfo: b.runtimeInfo,
Expand Down Expand Up @@ -383,8 +377,7 @@ func (b *Builder) Build() (Runtime, error) {
meshCache: b.meshCache,
interCpPool: b.interCpPool,
pgxConfigCustomizationFn: b.pgxConfigCustomizationFn,
hashingFn: b.hashingFn,
tenantFn: b.tenantFn,
tenants: b.tenants,
},
Manager: b.cm,
}, nil
Expand Down Expand Up @@ -506,14 +499,10 @@ func (b *Builder) XDS() xds_runtime.XDSRuntimeContext {
return b.xds
}

func (b *Builder) HashingFn() multitenant.Hashing {
return b.hashingFn
}

func (b *Builder) PgxConfigCustomizationFn() config.PgxConfigCustomization {
return b.pgxConfigCustomizationFn
}

func (b *Builder) TenantFn() multitenant.Tenant {
return b.tenantFn
func (b *Builder) Tenants() multitenant.Tenants {
return b.tenants
}
Loading

0 comments on commit 091f04f

Please sign in to comment.