Skip to content

Commit

Permalink
fix(kuma-cp) do not override other dataplane with dp lifecycle (#3507)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Dec 16, 2021
1 parent aa57952 commit f464b5b
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 20 deletions.
4 changes: 2 additions & 2 deletions mk/e2e.new.mk
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ test/e2e/test:
test/e2e/debug: build/kumactl images test/e2e/k8s/start
K8SCLUSTERS="$(K8SCLUSTERS)" \
KUMACTLBIN=${BUILD_ARTIFACTS_DIR}/kumactl/kumactl \
API_VERSION="$(API_VERSION)" \
$(ENV_VARS) \
GINKGO_EDITOR_INTEGRATION=true \
ginkgo --failFast $(GOFLAGS) $(LD_FLAGS) $(E2E_PKG_LIST)

Expand All @@ -140,7 +140,7 @@ test/e2e/debug: build/kumactl images test/e2e/k8s/start
test/e2e/debug-universal: build/kumactl images/test
K8SCLUSTERS="$(K8SCLUSTERS)" \
KUMACTLBIN=${BUILD_ARTIFACTS_DIR}/kumactl/kumactl \
API_VERSION="$(API_VERSION)" \
$(ENV_VARS) \
GINKGO_EDITOR_INTEGRATION=true \
ginkgo --failFast $(GOFLAGS) $(LD_FLAGS) $(E2E_PKG_LIST)

Expand Down
4 changes: 4 additions & 0 deletions pkg/util/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ func MustToStruct(message proto.Message) *structpb.Struct {
}
return str
}

func IsEmpty(message proto.Message) bool {
return proto.Size(message) == 0
}
4 changes: 2 additions & 2 deletions pkg/xds/auth/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a *authCallbacks) credential(streamID core_xds.StreamID) (Credential, erro
if !exists {
return "", errors.Errorf("there is no context for stream ID %d", streamID)
}
credential, err := extractCredential(ctx)
credential, err := ExtractCredential(ctx)
if err != nil {
return "", errors.Wrap(err, "could not extract credential from DiscoveryRequest")
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (a *authCallbacks) authenticate(credential Credential, req util_xds.Discove
"authentication failed")
}

func extractCredential(ctx context.Context) (Credential, error) {
func ExtractCredential(ctx context.Context) (Credential, error) {
metadata, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", errors.Errorf("request has no metadata")
Expand Down
60 changes: 47 additions & 13 deletions pkg/xds/server/callbacks/dataplane_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
)

var lifecycleLog = core.Log.WithName("xds").WithName("dp-lifecycle")
Expand All @@ -27,33 +29,38 @@ var lifecycleLog = core.Log.WithName("xds").WithName("dp-lifecycle")
//
// This flow is optional, you may still want to go with 1. an example of this is Kubernetes deployment.
type DataplaneLifecycle struct {
resManager manager.ResourceManager
resManager manager.ResourceManager
authenticator xds_auth.Authenticator

sync.RWMutex // protects createdDpByCallbacks
createdDpByCallbacks map[model.ResourceKey]mesh_proto.ProxyType
appCtx context.Context
}

func (d *DataplaneLifecycle) OnProxyConnected(streamID core_xds.StreamID, dpKey model.ResourceKey, _ context.Context, md core_xds.DataplaneMetadata) error {
return d.register(streamID, dpKey, md)
func (d *DataplaneLifecycle) OnProxyConnected(streamID core_xds.StreamID, dpKey model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error {
return d.register(ctx, streamID, dpKey, md)
}

func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKey model.ResourceKey, _ context.Context, md core_xds.DataplaneMetadata) error {
return d.register(streamID, dpKey, md)
func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKey model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error {
return d.register(ctx, streamID, dpKey, md)
}

func (d *DataplaneLifecycle) register(streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error {
func (d *DataplaneLifecycle) register(ctx context.Context, streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error {
switch {
case md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil:
dp := md.GetDataplaneResource()
lifecycleLog.Info("registering dataplane", "dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID)
if err := d.registerDataplane(dp); err != nil {
log := lifecycleLog.WithValues("dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID)
log.Info("registering dataplane")
if err := d.registerDataplane(ctx, dp); err != nil {
log.Info("cannot register dataplane", "reason", err.Error())
return errors.Wrap(err, "could not register dataplane passed in kuma-dp run")
}
case md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil:
zi := md.GetZoneIngressResource()
lifecycleLog.Info("registering zone ingress", "zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID)
if err := d.registerZoneIngress(zi); err != nil {
log := lifecycleLog.WithValues("zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID)
log.Info("registering zone ingress")
if err := d.registerZoneIngress(ctx, zi); err != nil {
log.Info("cannot register zone ingress", "reason", err.Error())
return errors.Wrap(err, "could not register zone ingress passed in kuma-dp run")
}
default:
Expand Down Expand Up @@ -106,26 +113,53 @@ func (d *DataplaneLifecycle) OnProxyDisconnected(streamID core_xds.StreamID, dpK

var _ DataplaneCallbacks = &DataplaneLifecycle{}

func NewDataplaneLifecycle(appCtx context.Context, resManager manager.ResourceManager) *DataplaneLifecycle {
func NewDataplaneLifecycle(appCtx context.Context, resManager manager.ResourceManager, authenticator xds_auth.Authenticator) *DataplaneLifecycle {
return &DataplaneLifecycle{
resManager: resManager,
authenticator: authenticator,
createdDpByCallbacks: map[model.ResourceKey]mesh_proto.ProxyType{},
appCtx: appCtx,
}
}

func (d *DataplaneLifecycle) registerDataplane(dp *core_mesh.DataplaneResource) error {
func (d *DataplaneLifecycle) registerDataplane(ctx context.Context, dp *core_mesh.DataplaneResource) error {
key := model.MetaToResourceKey(dp.GetMeta())
existing := core_mesh.NewDataplaneResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
if err := d.validateUpsert(ctx, existing); err != nil {
return errors.Wrap(err, "you are trying to override existing dataplane to which you don't have an access.")
}
return existing.SetSpec(dp.GetSpec())
})
}

func (d *DataplaneLifecycle) registerZoneIngress(zi *core_mesh.ZoneIngressResource) error {
// validateUpsert checks if a new data plane proxy can replace the old one.
// We cannot just upsert a new data plane proxy over the old one, because if you generate token bound to mesh + kuma.io/service
// then you would be able to just replace any other data plane proxy in the system.
// Ideally, when starting a new data plane proxy, the old one should be deleted, so we could do Create instead of Upsert, but this may not be the case.
// For example, if you spin down CP, then DP, then start CP, the old DP is still there for a couple of minutes (see pkg/gc).
// We could check if Dataplane is identical, but CP may alter Dataplane resource after is connected.
// What we do instead is that we use current data plane proxy credential to check if we can manage already registered Dataplane.
// The assumption is that if you have a token that can manage the old Dataplane.
// You can delete it and create a new one, so we can simplify this manual process by just replacing it.
func (d *DataplaneLifecycle) validateUpsert(ctx context.Context, existing model.Resource) error {
if util_proto.IsEmpty(existing.GetSpec()) { // existing DP is empty, resource does not exist
return nil
}
credential, err := xds_auth.ExtractCredential(ctx)
if err != nil {
return err
}
return d.authenticator.Authenticate(ctx, existing, credential)
}

func (d *DataplaneLifecycle) registerZoneIngress(ctx context.Context, zi *core_mesh.ZoneIngressResource) error {
key := model.MetaToResourceKey(zi.GetMeta())
existing := core_mesh.NewZoneIngressResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
if err := d.validateUpsert(ctx, existing); err != nil {
return errors.Wrap(err, "you are trying to override existing zone ingress to which you don't have an access.")
}
return existing.SetSpec(zi.GetSpec())
})
}
Expand Down
84 changes: 83 additions & 1 deletion pkg/xds/server/callbacks/dataplane_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/structpb"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
Expand All @@ -20,21 +21,35 @@ import (
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/resources/model"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
. "github.com/kumahq/kuma/pkg/xds/server/callbacks"
)

type staticAuthenticator struct {
err error
}

func (s *staticAuthenticator) Authenticate(ctx context.Context, resource core_model.Resource, credential xds_auth.Credential) error {
return s.err
}

var _ xds_auth.Authenticator = &staticAuthenticator{}

var _ = Describe("Dataplane Lifecycle", func() {

var authenticator *staticAuthenticator
var resManager core_manager.ResourceManager
var callbacks envoy_server.Callbacks
var cancel func()
var ctx context.Context

BeforeEach(func() {
authenticator = &staticAuthenticator{}
store := memory.NewStore()
resManager = core_manager.NewResourceManager(store)
ctx, cancel = context.WithCancel(context.Background())
callbacks = util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(NewDataplaneLifecycle(ctx, resManager)))

callbacks = util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(NewDataplaneLifecycle(ctx, resManager, authenticator)))

err := resManager.Create(context.Background(), core_mesh.NewMeshResource(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -92,6 +107,73 @@ var _ = Describe("Dataplane Lifecycle", func() {
Expect(core_store.IsResourceNotFound(err)).To(BeTrue())
})

It("should not override extisting DP with different service", func() {
// given already created DP
dp := &core_mesh.DataplaneResource{
Meta: &model.ResourceMeta{
Mesh: "default",
Name: "dp-01",
},
Spec: &mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "192.168.0.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 8080,
ServicePort: 8081,
Tags: map[string]string{
"kuma.io/service": "backend",
},
},
},
},
},
}
err := resManager.Create(context.Background(), dp, core_store.CreateByKey("dp-01", "default"))
Expect(err).ToNot(HaveOccurred())

// when
authenticator.err = errors.New("rejected")
req := envoy_sd.DiscoveryRequest{
Node: &envoy_core.Node{
Id: "default.backend-01",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"dataplane.resource": {
Kind: &structpb.Value_StringValue{
StringValue: `
{
"type": "Dataplane",
"mesh": "default",
"name": "dp-01",
"networking": {
"address": "127.0.0.1",
"inbound": [
{
"port": 22022,
"servicePort": 8443,
"tags": {
"kuma.io/service": "web"
}
},
]
}
}
`,
},
},
},
},
},
}
const streamId = 123
Expect(callbacks.OnStreamOpen(context.Background(), streamId, "")).To(Succeed())
err = callbacks.OnStreamRequest(streamId, &req)

// then
Expect(err).To(HaveOccurred())
})

It("should not delete DP when it is not carried in metadata", func() {
// given already created DP
dp := &core_mesh.DataplaneResource{
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/server/v3/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func RegisterXDS(
util_xds_v3.AdaptCallbacks(authCallbacks),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager()))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), authenticator))),
util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)),
util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff)),
newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher()),
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/auth/dp/auth_dp_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package auth_test

import (
"testing"

. "github.com/onsi/ginkgo"

"github.com/kumahq/kuma/pkg/test"
auth "github.com/kumahq/kuma/test/e2e/auth/dp"
"github.com/kumahq/kuma/test/framework"
)

func TestE2EDpAuth(t *testing.T) {
if framework.IsK8sClustersStarted() {
test.RunSpecs(t, "E2E Auth DP Suite")
} else {
t.SkipNow()
}
}

var _ = Describe("Test Universal", auth.DpAuthUniversal)
85 changes: 85 additions & 0 deletions test/e2e/auth/dp/dp_auth_universal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package auth

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
)

func DpAuthUniversal() {
var cluster Cluster
var deployOptsFuncs []KumaDeploymentOption

E2EBeforeSuite(func() {
cluster = NewUniversalCluster(NewTestingT(), Kuma3, Silent)
deployOptsFuncs = KumaUniversalDeployOpts

err := NewClusterSetup().
Install(Kuma(core.Standalone, deployOptsFuncs...)).
Setup(cluster)
Expect(err).ToNot(HaveOccurred())
err = cluster.VerifyKuma()
Expect(err).ToNot(HaveOccurred())
})

E2EAfterSuite(func() {
Expect(cluster.DeleteKuma(deployOptsFuncs...)).To(Succeed())
Expect(cluster.DismissCluster()).To(Succeed())
})

It("should not be able to override someone else Dataplane", func() {
// given other dataplane
dp := `
type: Dataplane
mesh: default
name: dp-01
networking:
address: 192.168.0.1
inbound:
- port: 8080
tags:
kuma.io/service: not-test-server
`
Expect(YamlUniversal(dp)(cluster)).To(Succeed())

// when trying to spin up dataplane with same name but token bound to a different service
dpToken, err := cluster.GetKuma().GenerateDpToken("default", "test-server")
Expect(err).ToNot(HaveOccurred())
err = TestServerUniversal("dp-01", "default", dpToken, WithServiceName("test-server"))(cluster)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func() (string, error) {
return cluster.GetKuma().GetKumaCPLogs()
}, "30s", "1s").Should(ContainSubstring("you are trying to override existing dataplane to which you don't have an access"))
})

It("should be able to override old Dataplane of same service", func() {
// given
dp := `
type: Dataplane
mesh: default
name: dp-02
networking:
address: 192.168.0.2
inbound:
- port: 8080
tags:
kuma.io/service: test-server
`
Expect(YamlUniversal(dp)(cluster)).To(Succeed())

// when
dpToken, err := cluster.GetKuma().GenerateDpToken("default", "test-server")
Expect(err).ToNot(HaveOccurred())
err = TestServerUniversal("dp-02", "default", dpToken, WithServiceName("test-server"))(cluster)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func() (string, error) {
return cluster.GetKumactlOptions().RunKumactlAndGetOutput("get", "dataplanes", "-oyaml")
}, "30s", "1s").ShouldNot(ContainSubstring("192.168.0.2"))
})
}
Loading

0 comments on commit f464b5b

Please sign in to comment.