Skip to content

Commit

Permalink
feat(kds): multitenancy (kumahq#6723)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored and bartsmykla committed May 11, 2023
1 parent 3acb124 commit c636334
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 19 deletions.
3 changes: 2 additions & 1 deletion pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,6 @@ func (r *reconciler) hashId(ctx context.Context, node *envoy_core.Node) (string,
if err != nil {
return "", err
}
return r.hasher.ID(node) + tenantID, nil
util_kds_v2.FillTenantMetadata(tenantID, node)
return r.hasher.ID(node), nil
}
11 changes: 7 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kumahq/kuma/pkg/kds/reconcile"
kds_server "github.com/kumahq/kuma/pkg/kds/server"
reconcile_v2 "github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/kds/v2/util"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
Expand Down Expand Up @@ -45,6 +46,7 @@ func New(
return nil, err
}
callbacks := util_xds_v3.CallbacksChain{
NewTenancyCallbacks(rt.Tenants()),
&typeAdjustCallbacks{},
util_xds_v3.NewControlPlaneIdCallbacks(serverID),
util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}),
Expand Down Expand Up @@ -121,8 +123,9 @@ func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotC
type hasher struct{}

func (_ hasher) ID(node *envoy_core.Node) string {
// TODO: https://github.com/kumahq/kuma/issues/6632 check if it needs to be the same hasher as passed to reconcile
// if it's not pkg/kds/global/components_test.go:271 test fails
// not sure if it's a problem with the test or the implementation
return node.Id
tenantID, found := util.TenantFromMetadata(node)
if !found {
return node.Id
}
return node.Id + ":" + tenantID
}
11 changes: 11 additions & 0 deletions pkg/kds/v2/server/server_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package server_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestServer(t *testing.T) {
test.RunSpecs(t, "Server Suite")
}
52 changes: 52 additions & 0 deletions pkg/kds/v2/server/tenant_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package server

import (
"context"
"errors"
"sync"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3"

"github.com/kumahq/kuma/pkg/kds/v2/util"
"github.com/kumahq/kuma/pkg/multitenant"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
)

type tenancyCallbacks struct {
tenants multitenant.Tenants

sync.RWMutex
streamToCtx map[int64]context.Context
util_xds_v3.NoopCallbacks
}

func NewTenancyCallbacks(tenants multitenant.Tenants) envoy_xds.Callbacks {
return &tenancyCallbacks{
tenants: tenants,
streamToCtx: map[int64]context.Context{},
}
}

func (c *tenancyCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID int64, _ string) error {
c.Lock()
c.streamToCtx[streamID] = ctx
c.Unlock()
return nil
}

func (c *tenancyCallbacks) OnStreamDeltaRequest(streamID int64, request *envoy_sd.DeltaDiscoveryRequest) error {
c.RLock()
defer c.RUnlock()
ctx, ok := c.streamToCtx[streamID]
if !ok {
// it should not happen, but just in case it's better to fail
return errors.New("context is missing")
}
tenantID, err := c.tenants.GetID(ctx)
if err != nil {
return err
}
util.FillTenantMetadata(tenantID, request.Node)
return nil
}
49 changes: 49 additions & 0 deletions pkg/kds/v2/server/tenant_callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package server_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/kds/v2/server"
"github.com/kumahq/kuma/pkg/kds/v2/util"
"github.com/kumahq/kuma/pkg/multitenant"
)

var _ = Describe("Tenant callbacks", func() {
It("should enrich metadata with tenant info", func() {
// given
streamID := int64(1)
callbacks := server.NewTenancyCallbacks(&sampleTenants{})

// when
err := callbacks.OnDeltaStreamOpen(multitenant.WithTenant(context.Background(), "sample"), streamID, "")
Expect(err).ToNot(HaveOccurred())

req := &envoy_sd.DeltaDiscoveryRequest{
Node: &envoy_core.Node{},
}
err = callbacks.OnStreamDeltaRequest(streamID, req)

// then
Expect(err).ToNot(HaveOccurred())

tenant, ok := util.TenantFromMetadata(req.GetNode())
Expect(ok).To(BeTrue())
Expect(tenant).To(Equal("sample"))
})
})

type sampleTenants struct{}

func (s sampleTenants) GetID(ctx context.Context) (string, error) {
tenant, _ := multitenant.TenantFromCtx(ctx)
return tenant, nil
}

func (s sampleTenants) GetIDs(ctx context.Context) ([]string, error) {
return nil, nil
}
23 changes: 23 additions & 0 deletions pkg/kds/v2/util/multitenancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package util

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"google.golang.org/protobuf/types/known/structpb"
)

const tenantMetadataKey = "tenant"

func FillTenantMetadata(tenantID string, node *envoy_core.Node) {
if node.Metadata == nil {
node.Metadata = &structpb.Struct{}
}
if node.Metadata.Fields == nil {
node.Metadata.Fields = map[string]*structpb.Value{}
}
node.Metadata.Fields[tenantMetadataKey] = structpb.NewStringValue(tenantID)
}

func TenantFromMetadata(node *envoy_core.Node) (string, bool) {
val, ok := node.GetMetadata().GetFields()[tenantMetadataKey]
return val.GetStringValue(), ok
}
7 changes: 7 additions & 0 deletions test/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type kumaDeploymentOptions struct {
yamlConfig string
transparentProxyV1 bool
apiHeaders []string
zoneName string

// Functions to apply to each mesh after the control plane
// is provisioned.
Expand Down Expand Up @@ -360,6 +361,12 @@ func WithApiHeaders(headers ...string) KumaDeploymentOption {
})
}

func WithZoneName(zoneName string) KumaDeploymentOption {
return KumaOptionFunc(func(o *kumaDeploymentOptions) {
o.zoneName = zoneName
})
}

// WithoutDataplane suppresses the automatic configuration of kuma-dp
// in the application container. This is useful when the test requires a
// container that is not bound to the mesh.
Expand Down
15 changes: 14 additions & 1 deletion test/framework/k8s_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func (c *K8sCluster) deployKumaViaKubectl(mode string) error {

func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) {
argsMap := map[string]string{
"--mode": mode,
"--namespace": Config.KumaNamespace,
"--control-plane-repository": Config.KumaCPImageRepo,
"--dataplane-repository": Config.KumaDPImageRepo,
Expand All @@ -332,8 +333,16 @@ func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) {

switch mode {
case core.Zone:
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.GetKumactlOptions().CPName
}
argsMap["--zone"] = zoneName
argsMap["--kds-global-address"] = c.opts.globalAddress
}
if !Config.UseLoadBalancer {
argsMap["--use-node-port"] = ""
}

if c.opts.zoneIngress {
argsMap["--ingress-enabled"] = ""
Expand Down Expand Up @@ -445,7 +454,11 @@ func (c *K8sCluster) genValues(mode string) map[string]string {
values["controlPlane.globalZoneSyncService.type"] = "NodePort"
}
case core.Zone:
values["controlPlane.zone"] = c.GetKumactlOptions().CPName
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.GetKumactlOptions().CPName
}
values["controlPlane.zone"] = zoneName
values["controlPlane.kdsGlobalAddress"] = c.opts.globalAddress
}

Expand Down
12 changes: 0 additions & 12 deletions test/framework/kumactl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/gruntwork-io/terratest/modules/testing"
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/config/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/model/rest"
)
Expand Down Expand Up @@ -134,17 +133,6 @@ func (k *KumactlOptions) KumactlInstallCP(mode string, args ...string) (string,
"install", "control-plane",
}

cmd = append(cmd, "--mode", mode)
switch mode {
case core.Zone:
cmd = append(cmd, "--zone", k.CPName)
fallthrough
case core.Global:
if !Config.UseLoadBalancer {
cmd = append(cmd, "--use-node-port")
}
}

cmd = append(cmd, args...)

return k.RunKumactlAndGetOutputV(
Expand Down
6 changes: 5 additions & 1 deletion test/framework/universal_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (c *UniversalCluster) DeployKuma(mode core.CpMode, opt ...KumaDeploymentOpt

cmd := []string{"kuma-cp", "run", "--config-file", "/kuma/kuma-cp.conf"}
if mode == core.Zone {
env["KUMA_MULTIZONE_ZONE_NAME"] = c.name
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.name
}
env["KUMA_MULTIZONE_ZONE_NAME"] = zoneName
}

app, err := NewUniversalApp(c.t, c.name, AppModeCP, "", AppModeCP, c.opts.isipv6, true, []string{}, dockerVolumes, "")
Expand Down

0 comments on commit c636334

Please sign in to comment.