Skip to content

Commit

Permalink
fix(kuma-cp) broken SDS auth and XDS generation on rapid DP restarts (#…
Browse files Browse the repository at this point in the history
…2342)

Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Jul 13, 2021
1 parent 3a950af commit c4a37e4
Show file tree
Hide file tree
Showing 17 changed files with 444 additions and 252 deletions.
12 changes: 7 additions & 5 deletions pkg/sds/server/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-logr/logr"

"github.com/kumahq/kuma/pkg/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
sds_ca "github.com/kumahq/kuma/pkg/sds/ca"
Expand Down Expand Up @@ -72,7 +73,8 @@ func RegisterSDS(rt core_runtime.Runtime, sdsMetrics *sds_metrics.Metrics) error
}

func syncTracker(reconciler *DataplaneReconciler, refresh time.Duration, sdsMetrics *sds_metrics.Metrics) (util_xds.Callbacks, error) {
return xds_callbacks.NewDataplaneSyncTracker(func(proxyId *core_xds.ProxyId, streamId int64) util_watchdog.Watchdog {
return xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(func(key core_model.ResourceKey) util_watchdog.Watchdog {
proxyId := core_xds.FromResourceKey(key)
return &util_watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
sdsMetrics.IncrementActiveWatchdogs(envoy_common.APIV3)
Expand All @@ -83,20 +85,20 @@ func syncTracker(reconciler *DataplaneReconciler, refresh time.Duration, sdsMetr
defer func() {
sdsMetrics.SdsGeneration(envoy_common.APIV3).Observe(float64(core.Now().Sub(start).Milliseconds()))
}()
return reconciler.Reconcile(proxyId)
return reconciler.Reconcile(&proxyId)
},
OnError: func(err error) {
sdsMetrics.SdsGenerationsErrors(envoy_common.APIV3).Inc()
sdsServerLog.Error(err, "OnTick() failed")
},
OnStop: func() {
if err := reconciler.Cleanup(proxyId); err != nil {
sdsServerLog.Error(err, "could not cleanup sync", "dataplane", proxyId.ToResourceKey())
if err := reconciler.Cleanup(&proxyId); err != nil {
sdsServerLog.Error(err, "could not cleanup sync", "dataplane", key)
}
sdsMetrics.DecrementActiveWatchdogs(envoy_common.APIV3)
},
}
}), nil
})), nil
}

type hasher struct {
Expand Down
29 changes: 18 additions & 11 deletions pkg/xds/server/callbacks/connection_info_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,42 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"

core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
)

const authorityHeader = ":authority"

// ConnectionInfoTracker tracks the information about the connection itself from the data plane to the control plane
type ConnectionInfoTracker struct {
util_xds.NoopCallbacks
sync.RWMutex
connectionInfos map[core_xds.StreamID]xds_context.ConnectionInfo
connectionInfos map[core_model.ResourceKey]*xds_context.ConnectionInfo
}

var _ util_xds.Callbacks = &ConnectionInfoTracker{}
var _ DataplaneCallbacks = &ConnectionInfoTracker{}

func NewConnectionInfoTracker() *ConnectionInfoTracker {
return &ConnectionInfoTracker{
connectionInfos: map[core_xds.StreamID]xds_context.ConnectionInfo{},
connectionInfos: map[core_model.ResourceKey]*xds_context.ConnectionInfo{},
}
}

func (c *ConnectionInfoTracker) ConnectionInfo(streamID core_xds.StreamID) xds_context.ConnectionInfo {
func (c *ConnectionInfoTracker) ConnectionInfo(dpKey core_model.ResourceKey) *xds_context.ConnectionInfo {
c.RLock()
defer c.RUnlock()
return c.connectionInfos[streamID]
return c.connectionInfos[dpKey]
}

func (c *ConnectionInfoTracker) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error {
func (c *ConnectionInfoTracker) OnProxyReconnected(_ core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, _ core_xds.DataplaneMetadata) error {
return c.processConnectionInfo(dpKey, ctx)
}

func (c *ConnectionInfoTracker) OnProxyConnected(_ core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, _ core_xds.DataplaneMetadata) error {
return c.processConnectionInfo(dpKey, ctx)
}

func (c *ConnectionInfoTracker) processConnectionInfo(dpKey core_model.ResourceKey, ctx context.Context) error {
metadata, ok := metadata.FromIncomingContext(ctx)
if !ok {
return errors.New("request has no metadata")
Expand All @@ -48,13 +55,13 @@ func (c *ConnectionInfoTracker) OnStreamOpen(ctx context.Context, streamID core_
connInfo := xds_context.ConnectionInfo{
Authority: values[0],
}
c.connectionInfos[streamID] = connInfo
c.connectionInfos[dpKey] = &connInfo
c.Unlock()
return nil
}

func (c *ConnectionInfoTracker) OnStreamClosed(streamID core_xds.StreamID) {
func (c *ConnectionInfoTracker) OnProxyDisconnected(_ core_xds.StreamID, dpKey core_model.ResourceKey) {
c.Lock()
delete(c.connectionInfos, streamID)
delete(c.connectionInfos, dpKey)
c.Unlock()
}
154 changes: 154 additions & 0 deletions pkg/xds/server/callbacks/dataplane_callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package callbacks

import (
"context"
"sync"

"github.com/pkg/errors"

core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
)

// DataplaneCallbacks are XDS callbacks that keep the context of Kuma Dataplane.
// In the ideal world we could assume that one Dataplane has one xDS stream.
// Due to race network latencies etc. there might be a situation when one Dataplane has many xDS streams for the short period of time.
// Those callbacks helps us to deal with such situation.
//
// Keep in mind that it does not solve many xDS streams across many instances of the Control Plane.
// If there are many instances of the Control Plane and Dataplane reconnects, there might be an old stream
// in one instance of CP and a new stream in a new instance of CP.
//
// Those callbacks may be also used with SDS. In case of SDS, at this moment Envoy creates many SDS streams to the Control Plane.
type DataplaneCallbacks interface {
// OnProxyConnected is executed when proxy is connected after it was disconnected before.
OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error
// OnProxyReconnected is executed when proxy is already connected, but there is another stream.
// This can happen when there is a delay with closing the old connection from the proxy to the control plane.
OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error
// OnProxyDisconnected is executed only when the last stream of the proxy disconnects.
OnProxyDisconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey)
}

type xdsCallbacks struct {
callbacks DataplaneCallbacks
util_xds.NoopCallbacks

sync.RWMutex
dpStreams map[core_xds.StreamID]dpStream
activeStreams map[core_model.ResourceKey]int
}

func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.Callbacks {
return &xdsCallbacks{
callbacks: callbacks,
dpStreams: map[core_xds.StreamID]dpStream{},
activeStreams: map[core_model.ResourceKey]int{},
}
}

type dpStream struct {
dp *core_model.ResourceKey
ctx context.Context
}

var _ util_xds.Callbacks = &xdsCallbacks{}

func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) {
d.Lock()
defer d.Unlock()
dpStream := d.dpStreams[streamID]
if dpKey := dpStream.dp; dpKey != nil {
d.activeStreams[*dpKey]--
if d.activeStreams[*dpKey] == 0 {
d.callbacks.OnProxyDisconnected(streamID, *dpKey)
delete(d.activeStreams, *dpKey)
}
}
delete(d.dpStreams, streamID)
}

func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error {
if request.NodeId() == "" {
// from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning:
// Only the first request on a stream is guaranteed to carry the node identifier.
// The subsequent discovery requests on the same stream may carry an empty node identifier.
// This holds true regardless of the acceptance of the discovery responses on the same stream.
// The node identifier should always be identical if present more than once on the stream.
// It is sufficient to only check the first message for the node identifier as a result.
return nil
}

d.RLock()
alreadyProcessed := d.dpStreams[streamID].dp != nil
d.RUnlock()
if alreadyProcessed {
return nil
}

proxyId, err := core_xds.ParseProxyIdFromString(request.NodeId())
if err != nil {
return errors.Wrap(err, "invalid node ID")
}
dpKey := proxyId.ToResourceKey()
metadata := core_xds.DataplaneMetadataFromXdsMetadata(request.Metadata())
if metadata == nil {
return errors.New("metadata in xDS Node cannot be nil")
}

d.Lock()
// in case client will open 2 concurrent request for the same streamID then
// we don't to increment the counter twice, so checking once again that stream
// wasn't processed
alreadyProcessed = d.dpStreams[streamID].dp != nil
if alreadyProcessed {
return nil
}

dpStream := d.dpStreams[streamID]
dpStream.dp = &dpKey
d.dpStreams[streamID] = dpStream

activeStreams := d.activeStreams[dpKey]
d.activeStreams[dpKey]++
d.Unlock()

if activeStreams == 0 {
if err := d.callbacks.OnProxyConnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil {
return err
}
} else {
if err := d.callbacks.OnProxyReconnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil {
return err
}
}
return nil
}

func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error {
d.Lock()
defer d.Unlock()
dps := dpStream{
ctx: ctx,
}
d.dpStreams[streamID] = dps
return nil
}

// NoopDataplaneCallbacks are empty callbacks that helps to implement DataplaneCallbacks without need to implement every function.
type NoopDataplaneCallbacks struct {
}

func (n *NoopDataplaneCallbacks) OnProxyReconnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error {
return nil
}

func (n *NoopDataplaneCallbacks) OnProxyConnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error {
return nil
}

func (n *NoopDataplaneCallbacks) OnProxyDisconnected(core_xds.StreamID, core_model.ResourceKey) {
}

var _ DataplaneCallbacks = &NoopDataplaneCallbacks{}
108 changes: 108 additions & 0 deletions pkg/xds/server/callbacks/dataplane_callbacks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package callbacks_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"google.golang.org/protobuf/types/known/structpb"

core_xds "github.com/kumahq/kuma/pkg/core/xds"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"

core_model "github.com/kumahq/kuma/pkg/core/resources/model"
. "github.com/kumahq/kuma/pkg/xds/server/callbacks"
)

type countingDpCallbacks struct {
OnProxyConnectedCounter int
OnProxyReconnectedCounter int
OnProxyDisconnectedCounter int
}

func (c *countingDpCallbacks) OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error {
c.OnProxyConnectedCounter++
return nil
}

func (c *countingDpCallbacks) OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error {
c.OnProxyReconnectedCounter++
return nil
}

func (c *countingDpCallbacks) OnProxyDisconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey) {
c.OnProxyDisconnectedCounter++
}

var _ DataplaneCallbacks = &countingDpCallbacks{}

var _ = Describe("Dataplane Callbacks", func() {

countingCallbacks := &countingDpCallbacks{}
callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(countingCallbacks))

req := envoy_sd.DiscoveryRequest{
Node: &envoy_core.Node{
Id: "default.example",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"dataplane.token": &structpb.Value{
Kind: &structpb.Value_StringValue{
StringValue: "token",
},
},
},
},
},
}

It("should call DataplaneCallbacks correctly", func() {
// when only OnStreamOpen is called
err := callbacks.OnStreamOpen(context.Background(), 1, "")
Expect(err).ToNot(HaveOccurred())

// then OnProxyConnected and OnProxyReconnected is not yet called
Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(0))
Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0))

// when OnStreamRequest is sent
err = callbacks.OnStreamRequest(1, &req)
Expect(err).ToNot(HaveOccurred())

// then only OnProxyConnected should be called
Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1))
Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0))

// when next OnStreamRequest on the same stream is sent
err = callbacks.OnStreamRequest(1, &req)
Expect(err).ToNot(HaveOccurred())

// then OnProxyReconnected and OnProxyReconnected are not called again, they should be only called on the first DiscoveryRequest
Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1))
Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0))

// when next stream for given data plane proxy is connected
err = callbacks.OnStreamOpen(context.Background(), 2, "")
Expect(err).ToNot(HaveOccurred())
err = callbacks.OnStreamRequest(2, &req)
Expect(err).ToNot(HaveOccurred())

// then only OnProxyReconnected should be called
Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1))
Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(1))

// when first stream is closed
callbacks.OnStreamClosed(1)

// then OnProxyDisconnected should not yet be called
Expect(countingCallbacks.OnProxyDisconnectedCounter).To(Equal(0))

// when last stream is closed
callbacks.OnStreamClosed(2)

// then OnProxyDisconnected should be called
Expect(countingCallbacks.OnProxyDisconnectedCounter).To(Equal(1))
})
})
Loading

0 comments on commit c4a37e4

Please sign in to comment.