Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp) do not override other dataplane with dp lifecycle #3507

Merged
merged 3 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mk/e2e.new.mk
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ test/e2e/test:
test/e2e/debug: build/kumactl images test/e2e/k8s/start
K8SCLUSTERS="$(K8SCLUSTERS)" \
KUMACTLBIN=${BUILD_ARTIFACTS_DIR}/kumactl/kumactl \
API_VERSION="$(API_VERSION)" \
$(ENV_VARS) \
GINKGO_EDITOR_INTEGRATION=true \
ginkgo --failFast $(GOFLAGS) $(LD_FLAGS) $(E2E_PKG_LIST)

Expand All @@ -140,7 +140,7 @@ test/e2e/debug: build/kumactl images test/e2e/k8s/start
test/e2e/debug-universal: build/kumactl images/test
K8SCLUSTERS="$(K8SCLUSTERS)" \
KUMACTLBIN=${BUILD_ARTIFACTS_DIR}/kumactl/kumactl \
API_VERSION="$(API_VERSION)" \
$(ENV_VARS) \
GINKGO_EDITOR_INTEGRATION=true \
ginkgo --failFast $(GOFLAGS) $(LD_FLAGS) $(E2E_PKG_LIST)

Expand Down
4 changes: 4 additions & 0 deletions pkg/util/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ func MustToStruct(message proto.Message) *structpb.Struct {
}
return str
}

func IsEmpty(message proto.Message) bool {
return proto.Size(message) == 0
}
4 changes: 2 additions & 2 deletions pkg/xds/auth/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a *authCallbacks) credential(streamID core_xds.StreamID) (Credential, erro
if !exists {
return "", errors.Errorf("there is no context for stream ID %d", streamID)
}
credential, err := extractCredential(ctx)
credential, err := ExtractCredential(ctx)
if err != nil {
return "", errors.Wrap(err, "could not extract credential from DiscoveryRequest")
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (a *authCallbacks) authenticate(credential Credential, req util_xds.Discove
"authentication failed")
}

func extractCredential(ctx context.Context) (Credential, error) {
func ExtractCredential(ctx context.Context) (Credential, error) {
metadata, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", errors.Errorf("request has no metadata")
Expand Down
60 changes: 47 additions & 13 deletions pkg/xds/server/callbacks/dataplane_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
)

var lifecycleLog = core.Log.WithName("xds").WithName("dp-lifecycle")
Expand All @@ -27,33 +29,38 @@ var lifecycleLog = core.Log.WithName("xds").WithName("dp-lifecycle")
//
// This flow is optional, you may still want to go with 1. an example of this is Kubernetes deployment.
type DataplaneLifecycle struct {
resManager manager.ResourceManager
resManager manager.ResourceManager
authenticator xds_auth.Authenticator

sync.RWMutex // protects createdDpByCallbacks
createdDpByCallbacks map[model.ResourceKey]mesh_proto.ProxyType
appCtx context.Context
}

func (d *DataplaneLifecycle) OnProxyConnected(streamID core_xds.StreamID, dpKey model.ResourceKey, _ context.Context, md core_xds.DataplaneMetadata) error {
return d.register(streamID, dpKey, md)
func (d *DataplaneLifecycle) OnProxyConnected(streamID core_xds.StreamID, dpKey model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error {
return d.register(ctx, streamID, dpKey, md)
}

func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKey model.ResourceKey, _ context.Context, md core_xds.DataplaneMetadata) error {
return d.register(streamID, dpKey, md)
func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, dpKey model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error {
return d.register(ctx, streamID, dpKey, md)
}

func (d *DataplaneLifecycle) register(streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error {
func (d *DataplaneLifecycle) register(ctx context.Context, streamID core_xds.StreamID, dpKey model.ResourceKey, md core_xds.DataplaneMetadata) error {
switch {
case md.GetProxyType() == mesh_proto.DataplaneProxyType && md.GetDataplaneResource() != nil:
dp := md.GetDataplaneResource()
lifecycleLog.Info("registering dataplane", "dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID)
if err := d.registerDataplane(dp); err != nil {
log := lifecycleLog.WithValues("dataplane", dp, "dataplaneKey", dpKey, "streamID", streamID)
log.Info("registering dataplane")
if err := d.registerDataplane(ctx, dp); err != nil {
log.Info("cannot register dataplane", "reason", err.Error())
return errors.Wrap(err, "could not register dataplane passed in kuma-dp run")
}
case md.GetProxyType() == mesh_proto.IngressProxyType && md.GetZoneIngressResource() != nil:
zi := md.GetZoneIngressResource()
lifecycleLog.Info("registering zone ingress", "zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID)
if err := d.registerZoneIngress(zi); err != nil {
log := lifecycleLog.WithValues("zoneIngress", zi, "zoneIngressKey", dpKey, "streamID", streamID)
log.Info("registering zone ingress")
if err := d.registerZoneIngress(ctx, zi); err != nil {
log.Info("cannot register zone ingress", "reason", err.Error())
return errors.Wrap(err, "could not register zone ingress passed in kuma-dp run")
}
default:
Expand Down Expand Up @@ -106,26 +113,53 @@ func (d *DataplaneLifecycle) OnProxyDisconnected(streamID core_xds.StreamID, dpK

var _ DataplaneCallbacks = &DataplaneLifecycle{}

func NewDataplaneLifecycle(appCtx context.Context, resManager manager.ResourceManager) *DataplaneLifecycle {
func NewDataplaneLifecycle(appCtx context.Context, resManager manager.ResourceManager, authenticator xds_auth.Authenticator) *DataplaneLifecycle {
return &DataplaneLifecycle{
resManager: resManager,
authenticator: authenticator,
createdDpByCallbacks: map[model.ResourceKey]mesh_proto.ProxyType{},
appCtx: appCtx,
}
}

func (d *DataplaneLifecycle) registerDataplane(dp *core_mesh.DataplaneResource) error {
func (d *DataplaneLifecycle) registerDataplane(ctx context.Context, dp *core_mesh.DataplaneResource) error {
key := model.MetaToResourceKey(dp.GetMeta())
existing := core_mesh.NewDataplaneResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
if err := d.validateUpsert(ctx, existing); err != nil {
return errors.Wrap(err, "you are trying to override existing dataplane to which you don't have an access.")
}
return existing.SetSpec(dp.GetSpec())
})
}

func (d *DataplaneLifecycle) registerZoneIngress(zi *core_mesh.ZoneIngressResource) error {
// validateUpsert checks if a new data plane proxy can replace the old one.
// We cannot just upsert a new data plane proxy over the old one, because if you generate token bound to mesh + kuma.io/service
// then you would be able to just replace any other data plane proxy in the system.
// Ideally, when starting a new data plane proxy, the old one should be deleted, so we could do Create instead of Upsert, but this may not be the case.
// For example, if you spin down CP, then DP, then start CP, the old DP is still there for a couple of minutes (see pkg/gc).
// We could check if Dataplane is identical, but CP may alter Dataplane resource after is connected.
// What we do instead is that we use current data plane proxy credential to check if we can manage already registered Dataplane.
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
// The assumption is that if you have a token that can manage the old Dataplane.
// You can delete it and create a new one, so we can simplify this manual process by just replacing it.
func (d *DataplaneLifecycle) validateUpsert(ctx context.Context, existing model.Resource) error {
if util_proto.IsEmpty(existing.GetSpec()) { // existing DP is empty, resource does not exist
return nil
}
credential, err := xds_auth.ExtractCredential(ctx)
if err != nil {
return err
}
return d.authenticator.Authenticate(ctx, existing, credential)
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
}

func (d *DataplaneLifecycle) registerZoneIngress(ctx context.Context, zi *core_mesh.ZoneIngressResource) error {
key := model.MetaToResourceKey(zi.GetMeta())
existing := core_mesh.NewZoneIngressResource()
return manager.Upsert(d.resManager, key, existing, func(resource model.Resource) error {
if err := d.validateUpsert(ctx, existing); err != nil {
return errors.Wrap(err, "you are trying to override existing zone ingress to which you don't have an access.")
}
return existing.SetSpec(zi.GetSpec())
})
}
Expand Down
84 changes: 83 additions & 1 deletion pkg/xds/server/callbacks/dataplane_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/structpb"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
Expand All @@ -20,21 +21,35 @@ import (
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/resources/model"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
xds_auth "github.com/kumahq/kuma/pkg/xds/auth"
. "github.com/kumahq/kuma/pkg/xds/server/callbacks"
)

type staticAuthenticator struct {
err error
}

func (s *staticAuthenticator) Authenticate(ctx context.Context, resource core_model.Resource, credential xds_auth.Credential) error {
return s.err
}

var _ xds_auth.Authenticator = &staticAuthenticator{}

var _ = Describe("Dataplane Lifecycle", func() {

var authenticator *staticAuthenticator
var resManager core_manager.ResourceManager
var callbacks envoy_server.Callbacks
var cancel func()
var ctx context.Context

BeforeEach(func() {
authenticator = &staticAuthenticator{}
store := memory.NewStore()
resManager = core_manager.NewResourceManager(store)
ctx, cancel = context.WithCancel(context.Background())
callbacks = util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(NewDataplaneLifecycle(ctx, resManager)))

callbacks = util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(NewDataplaneLifecycle(ctx, resManager, authenticator)))

err := resManager.Create(context.Background(), core_mesh.NewMeshResource(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -92,6 +107,73 @@ var _ = Describe("Dataplane Lifecycle", func() {
Expect(core_store.IsResourceNotFound(err)).To(BeTrue())
})

It("should not override extisting DP with different service", func() {
// given already created DP
dp := &core_mesh.DataplaneResource{
Meta: &model.ResourceMeta{
Mesh: "default",
Name: "dp-01",
},
Spec: &mesh_proto.Dataplane{
Networking: &mesh_proto.Dataplane_Networking{
Address: "192.168.0.1",
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{
{
Port: 8080,
ServicePort: 8081,
Tags: map[string]string{
"kuma.io/service": "backend",
},
},
},
},
},
}
err := resManager.Create(context.Background(), dp, core_store.CreateByKey("dp-01", "default"))
Expect(err).ToNot(HaveOccurred())

// when
authenticator.err = errors.New("rejected")
req := envoy_sd.DiscoveryRequest{
Node: &envoy_core.Node{
Id: "default.backend-01",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"dataplane.resource": {
Kind: &structpb.Value_StringValue{
StringValue: `
{
"type": "Dataplane",
"mesh": "default",
"name": "dp-01",
"networking": {
"address": "127.0.0.1",
"inbound": [
{
"port": 22022,
"servicePort": 8443,
"tags": {
"kuma.io/service": "web"
}
},
]
}
}
`,
},
},
},
},
},
}
const streamId = 123
Expect(callbacks.OnStreamOpen(context.Background(), streamId, "")).To(Succeed())
err = callbacks.OnStreamRequest(streamId, &req)

// then
Expect(err).To(HaveOccurred())
})

It("should not delete DP when it is not carried in metadata", func() {
// given already created DP
dp := &core_mesh.DataplaneResource{
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/server/v3/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func RegisterXDS(
util_xds_v3.AdaptCallbacks(authCallbacks),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager()))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), authenticator))),
util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)),
util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff)),
newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher()),
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/auth/dp/auth_dp_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package auth_test

import (
"testing"

. "github.com/onsi/ginkgo"

"github.com/kumahq/kuma/pkg/test"
auth "github.com/kumahq/kuma/test/e2e/auth/dp"
"github.com/kumahq/kuma/test/framework"
)

func TestE2EDpAuth(t *testing.T) {
if framework.IsK8sClustersStarted() {
test.RunSpecs(t, "E2E Auth DP Suite")
} else {
t.SkipNow()
}
}

var _ = Describe("Test Universal", auth.DpAuthUniversal)
85 changes: 85 additions & 0 deletions test/e2e/auth/dp/dp_auth_universal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package auth

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/config/core"
. "github.com/kumahq/kuma/test/framework"
)

func DpAuthUniversal() {
var cluster Cluster
var deployOptsFuncs []KumaDeploymentOption

E2EBeforeSuite(func() {
cluster = NewUniversalCluster(NewTestingT(), Kuma3, Silent)
deployOptsFuncs = KumaUniversalDeployOpts

err := NewClusterSetup().
Install(Kuma(core.Standalone, deployOptsFuncs...)).
Setup(cluster)
Expect(err).ToNot(HaveOccurred())
err = cluster.VerifyKuma()
Expect(err).ToNot(HaveOccurred())
})

E2EAfterSuite(func() {
Expect(cluster.DeleteKuma(deployOptsFuncs...)).To(Succeed())
Expect(cluster.DismissCluster()).To(Succeed())
})

It("should not be able to override someone else Dataplane", func() {
// given other dataplane
dp := `
type: Dataplane
mesh: default
name: dp-01
networking:
address: 192.168.0.1
inbound:
- port: 8080
tags:
kuma.io/service: not-test-server
`
Expect(YamlUniversal(dp)(cluster)).To(Succeed())

// when trying to spin up dataplane with same name but token bound to a different service
dpToken, err := cluster.GetKuma().GenerateDpToken("default", "test-server")
Expect(err).ToNot(HaveOccurred())
err = TestServerUniversal("dp-01", "default", dpToken, WithServiceName("test-server"))(cluster)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func() (string, error) {
return cluster.GetKuma().GetKumaCPLogs()
}, "30s", "1s").Should(ContainSubstring("you are trying to override existing dataplane to which you don't have an access"))
})

It("should be able to override old Dataplane of same service", func() {
// given
dp := `
type: Dataplane
mesh: default
name: dp-02
networking:
address: 192.168.0.2
inbound:
- port: 8080
tags:
kuma.io/service: test-server
`
Expect(YamlUniversal(dp)(cluster)).To(Succeed())

// when
dpToken, err := cluster.GetKuma().GenerateDpToken("default", "test-server")
Expect(err).ToNot(HaveOccurred())
err = TestServerUniversal("dp-02", "default", dpToken, WithServiceName("test-server"))(cluster)
Expect(err).ToNot(HaveOccurred())

// then
Eventually(func() (string, error) {
return cluster.GetKumactlOptions().RunKumactlAndGetOutput("get", "dataplanes", "-oyaml")
}, "30s", "1s").ShouldNot(ContainSubstring("192.168.0.2"))
})
}
Loading