Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(envoyadmin): support passing kds envoy operations via http proxy #6915

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
361 changes: 234 additions & 127 deletions api/system/v1alpha1/zone_insight.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/system/v1alpha1/zone_insight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ message ZoneInsight {

// List of KDS subscriptions created by a given Zone Kuma CP.
repeated KDSSubscription subscriptions = 1;

// Statistics about Envoy Admin Streams
EnvoyAdminStreams envoy_admin_streams = 2;
}

message EnvoyAdminStreams {
// Global instance ID that handles XDS Config Dump streams.
string config_dump_global_instance_id = 1;
// Global instance ID that handles Stats streams.
string stats_global_instance_id = 2;
// Global instance ID that handles Clusters streams.
string clusters_global_instance_id = 3;
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
}

// KDSSubscription describes a single KDS subscription
Expand Down
31 changes: 24 additions & 7 deletions pkg/intercp/envoyadmin/forwarding_kds_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package envoyadmin
import (
"context"

"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
Expand All @@ -13,6 +14,7 @@ import (
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/envoy/admin"
"github.com/kumahq/kuma/pkg/intercp/catalog"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/multitenant"
)

Expand Down Expand Up @@ -60,7 +62,7 @@ func (f *forwardingKdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.Dat

func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
ctx = appendTenantMetadata(ctx)
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy))
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ConfigDumpRPC)
if err != nil {
return nil, err
}
Expand All @@ -86,7 +88,7 @@ func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy co

func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
ctx = appendTenantMetadata(ctx)
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy))
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.StatsRPC)
if err != nil {
return nil, err
}
Expand All @@ -112,7 +114,7 @@ func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_mo

func (f *forwardingKdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
ctx = appendTenantMetadata(ctx)
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy))
instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ClustersRPC)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,13 +152,28 @@ func (f *forwardingKdsEnvoyAdminClient) logIntendedAction(proxy core_model.Resou
}
}

func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string) (string, error) {
func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string, rpcName string) (string, error) {
zoneInsightRes := core_system.NewZoneInsightResource()
if err := f.resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
return "", err
}
sub := zoneInsightRes.Spec.GetLastSubscription().(*system_proto.KDSSubscription)
return sub.GlobalInstanceId, nil
zoneInsightRes.Spec.GetEnvoyAdminStreams().GetConfigDumpGlobalInstanceId()
streams := zoneInsightRes.Spec.GetEnvoyAdminStreams()
var globalInstanceID string
switch rpcName {
case service.ConfigDumpRPC:
globalInstanceID = streams.GetConfigDumpGlobalInstanceId()
case service.StatsRPC:
globalInstanceID = streams.GetStatsGlobalInstanceId()
case service.ClustersRPC:
globalInstanceID = streams.GetClustersGlobalInstanceId()
default:
return "", errors.Errorf("invalid operation %s", rpcName)
}
if globalInstanceID == "" {
return "", errors.Errorf("stream to execute %s operations is not yet connected", rpcName)
}
return globalInstanceID, nil
}

func (f *forwardingKdsEnvoyAdminClient) clientForInstanceID(ctx context.Context, instanceID string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/intercp/envoyadmin/forwarding_kds_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"google.golang.org/grpc"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/api/system/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
Expand Down Expand Up @@ -63,9 +63,11 @@ var _ = Describe("Forwarding KDS Client", func() {

createZoneInsightConnectedToGlobal := func(insight string, globalInstanceID string) {
zoneInsight := system.NewZoneInsightResource()
zoneInsight.Spec.Subscriptions = append(zoneInsight.Spec.Subscriptions, &v1alpha1.KDSSubscription{
GlobalInstanceId: globalInstanceID,
})
zoneInsight.Spec.EnvoyAdminStreams = &system_proto.EnvoyAdminStreams{
ConfigDumpGlobalInstanceId: globalInstanceID,
StatsGlobalInstanceId: globalInstanceID,
ClustersGlobalInstanceId: globalInstanceID,
}
err := resManager.Create(context.Background(), zoneInsight, core_store.CreateByKey(insight, model.NoMesh))
Expect(err).ToNot(HaveOccurred())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func Setup(rt runtime.Runtime) error {
rt.KDSContext().ServerStreamInterceptors,
*rt.Config().Multizone.Global.KDS,
rt.Metrics(),
service.NewGlobalKDSServiceServer(rt.KDSContext().EnvoyAdminRPCs),
service.NewGlobalKDSServiceServer(rt.KDSContext().EnvoyAdminRPCs, rt.ResourceManager(), rt.GetInstanceId()),
mux.NewKDSSyncServiceServer(
onGlobalToZoneSyncConnect,
onZoneToGlobalSyncConnect,
Expand Down
8 changes: 2 additions & 6 deletions pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,8 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) {
var lastStoredState *system_proto.KDSSubscription
var generation uint32

gracefulCtx, cancel := context.WithCancel(context.Background())
gracefulCtx, cancel := context.WithCancel(multitenant.CopyIntoCtx(ctx, context.Background()))
defer cancel()
tenantId, ok := multitenant.TenantFromCtx(ctx)
if ok {
gracefulCtx = multitenant.WithTenant(gracefulCtx, tenantId)
}

flush := func() {
zone, currentState := s.accessor.GetStatus()
Expand Down Expand Up @@ -117,5 +113,5 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription
zoneInsight := system.NewZoneInsightResource()
return manager.Upsert(ctx, s.resManager, key, zoneInsight, func(resource core_model.Resource) error {
return zoneInsight.Spec.UpdateSubscription(subscription)
})
}, manager.WithConflictRetry(100*time.Millisecond, 10)) // we need retry because Envoy Admin RPC may also update the insight.
}
6 changes: 6 additions & 0 deletions pkg/kds/service/envoy_admin_rpcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import (
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
)

const (
ConfigDumpRPC = "XDS Config Dump"
StatsRPC = "Stats"
ClustersRPC = "Clusters"
)

type EnvoyAdminRPCs struct {
XDSConfigDump util_grpc.ReverseUnaryRPCs
Stats util_grpc.ReverseUnaryRPCs
Expand Down
51 changes: 46 additions & 5 deletions pkg/kds/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,56 @@ package service
import (
"context"
"fmt"
"time"

"google.golang.org/grpc"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/api/system/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/kds/util"
"github.com/kumahq/kuma/pkg/multitenant"
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
)

type GlobalKDSServiceServer struct {
envoyAdminRPCs EnvoyAdminRPCs
resManager manager.ResourceManager
instanceID string
mesh_proto.UnimplementedGlobalKDSServiceServer
}

func NewGlobalKDSServiceServer(envoyAdminRPCs EnvoyAdminRPCs) *GlobalKDSServiceServer {
func NewGlobalKDSServiceServer(
envoyAdminRPCs EnvoyAdminRPCs,
resManager manager.ResourceManager,
instanceID string,
) *GlobalKDSServiceServer {
return &GlobalKDSServiceServer{
envoyAdminRPCs: envoyAdminRPCs,
resManager: resManager,
instanceID: instanceID,
}
}

var _ mesh_proto.GlobalKDSServiceServer = &GlobalKDSServiceServer{}

func (g *GlobalKDSServiceServer) StreamXDSConfigs(stream mesh_proto.GlobalKDSService_StreamXDSConfigsServer) error {
return g.streamEnvoyAdminRPC("XDS Config Dump", g.envoyAdminRPCs.XDSConfigDump, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return g.streamEnvoyAdminRPC(ConfigDumpRPC, g.envoyAdminRPCs.XDSConfigDump, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
}

func (g *GlobalKDSServiceServer) StreamStats(stream mesh_proto.GlobalKDSService_StreamStatsServer) error {
return g.streamEnvoyAdminRPC("Stats", g.envoyAdminRPCs.Stats, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return g.streamEnvoyAdminRPC(StatsRPC, g.envoyAdminRPCs.Stats, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
}

func (g *GlobalKDSServiceServer) StreamClusters(stream mesh_proto.GlobalKDSService_StreamClustersServer) error {
return g.streamEnvoyAdminRPC("Clusters", g.envoyAdminRPCs.Clusters, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return g.streamEnvoyAdminRPC(ClustersRPC, g.envoyAdminRPCs.Clusters, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
}
Expand All @@ -57,7 +70,17 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
clientID := ClientID(stream.Context(), zone)
core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "clientID", clientID)
rpc.ClientConnected(clientID, stream)
defer rpc.ClientDisconnected(clientID)
if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil {
return err
}
defer func() {
rpc.ClientDisconnected(clientID)
// stream.Context() is cancelled here, we need to use another ctx
ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background())
if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil {
core.Log.Error(err, "could not clear stream connection information in ZoneInsight", "rpc", rpcName, "clientID", clientID, "rpc", rpcName)
}
}()
for {
resp, err := recv()
if err != nil {
Expand All @@ -70,6 +93,24 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
}
}

func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone string, rpcName string, instance string) error {
zoneInsight := system.NewZoneInsightResource()
return manager.Upsert(ctx, g.resManager, model.ResourceKey{Name: zone}, zoneInsight, func(resource model.Resource) error {
if zoneInsight.Spec.EnvoyAdminStreams == nil {
zoneInsight.Spec.EnvoyAdminStreams = &v1alpha1.EnvoyAdminStreams{}
}
switch rpcName {
case ConfigDumpRPC:
zoneInsight.Spec.EnvoyAdminStreams.ConfigDumpGlobalInstanceId = instance
case StatsRPC:
zoneInsight.Spec.EnvoyAdminStreams.StatsGlobalInstanceId = instance
case ClustersRPC:
zoneInsight.Spec.EnvoyAdminStreams.ClustersGlobalInstanceId = instance
}
return nil
}, manager.WithConflictRetry(100*time.Millisecond, 10)) // we need retry because zone sink or other RPC may also update the insight.
}

func ClientID(ctx context.Context, zone string) string {
tenantID, ok := multitenant.TenantFromCtx(ctx)
if !ok {
Expand Down
9 changes: 9 additions & 0 deletions pkg/multitenant/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ func TenantFromCtx(ctx context.Context) (string, bool) {
return value, ok
}

// CopyIntoCtx copies tenant information from src context to dst context
func CopyIntoCtx(src context.Context, dst context.Context) context.Context {
tenantId, ok := TenantFromCtx(src)
if !ok {
return dst
}
return WithTenant(dst, tenantId)
}

var TenantMissingErr = errors.New("tenant is missing")