Skip to content

Commit

Permalink
feat: MeshAccessLog implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Sep 21, 2022
1 parent 29248a6 commit e8b1d01
Show file tree
Hide file tree
Showing 5 changed files with 655 additions and 3 deletions.
147 changes: 147 additions & 0 deletions pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/access_log_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package v1alpha1

import (
"fmt"
"net"

envoy_accesslog "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
access_loggers_file "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"

core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
accesslog "github.com/kumahq/kuma/pkg/envoy/accesslog/v3"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshaccesslog/api/v1alpha1"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
"github.com/kumahq/kuma/pkg/xds/envoy"
listeners_v3 "github.com/kumahq/kuma/pkg/xds/envoy/listeners/v3"
)

const (
defaultNetworkAccessLogFormat = `[%START_TIME%] %RESPONSE_FLAGS% %KUMA_MESH% %KUMA_SOURCE_ADDRESS_WITHOUT_PORT%(%KUMA_SOURCE_SERVICE%)->%UPSTREAM_HOST%(%KUMA_DESTINATION_SERVICE%) took %DURATION%ms, sent %BYTES_SENT% bytes, received: %BYTES_RECEIVED% bytes`
defaultHttpAccessLogFormat = `[%START_TIME%] %KUMA_MESH% "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-B3-TRACEID?X-DATADOG-TRACEID)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%KUMA_SOURCE_SERVICE%" "%KUMA_DESTINATION_SERVICE%" "%KUMA_SOURCE_ADDRESS_WITHOUT_PORT%" "%UPSTREAM_HOST%"`
)

type Configurer struct {
Mesh string
TrafficDirection envoy.TrafficDirection
SourceService string
DestinationService string
Backend *api.MeshAccessLog_Backend
Dataplane *core_mesh.DataplaneResource
}

func (c *Configurer) handlePlain(formatString string) (*accesslog.AccessLogFormat, error) {
envoyFormat, err := accesslog.ParseFormat(formatString + "\n")

if err != nil {
return nil, errors.Wrapf(err, "invalid access log format string: %s", formatString)
}

variables := accesslog.InterpolationVariables{
accesslog.CMD_KUMA_SOURCE_ADDRESS: net.JoinHostPort(c.Dataplane.GetIP(), "0"), // deprecated variable
accesslog.CMD_KUMA_SOURCE_ADDRESS_WITHOUT_PORT: c.Dataplane.GetIP(), // replacement variable
accesslog.CMD_KUMA_SOURCE_SERVICE: c.SourceService,
accesslog.CMD_KUMA_DESTINATION_SERVICE: c.DestinationService,
accesslog.CMD_KUMA_MESH: c.Mesh,
accesslog.CMD_KUMA_TRAFFIC_DIRECTION: string(c.TrafficDirection),
}

envoyFormat, err = envoyFormat.Interpolate(variables)
if err != nil {
return nil, errors.Wrapf(err, "failed to interpolate access log format string with Kuma-specific variables: %s", formatString)
}

return envoyFormat, nil
}

func (c *Configurer) envoyAccessLog(defaultFormat string) (*envoy_accesslog.AccessLog, error) {
var format *api.MeshAccessLog_Format
if f := c.Backend.GetFile().GetFormat(); f != nil {
format = f
} else if f := c.Backend.GetTcp().GetFormat(); f != nil {
format = f
}

// TODO json

if len(format.GetJson()) == 0 {
formatString := format.GetPlain()
if formatString == "" {
formatString = defaultFormat
}
envoyFormat, err := c.handlePlain(formatString)
if err != nil {
return nil, err
}

if file := c.Backend.GetFile(); file != nil {
return fileAccessLog(envoyFormat.String(), file.Path)
} else if tcp := c.Backend.GetTcp(); tcp != nil {
path := envoy.AccessLogSocketName(c.Dataplane.Meta.GetName(), c.Mesh)
return fileAccessLog(fmt.Sprintf("%s;%s", tcp.Address, envoyFormat.String()), path)
}
}

panic("impossible backend type")
}

func fileAccessLog(format string, path string) (*envoy_accesslog.AccessLog, error) {
fileAccessLog := &access_loggers_file.FileAccessLog{
AccessLogFormat: &access_loggers_file.FileAccessLog_LogFormat{
LogFormat: &envoy_core.SubstitutionFormatString{
Format: &envoy_core.SubstitutionFormatString_TextFormatSource{
TextFormatSource: &envoy_core.DataSource{
Specifier: &envoy_core.DataSource_InlineString{
InlineString: format,
},
},
},
},
},
Path: path,
}

marshaled, err := util_proto.MarshalAnyDeterministic(fileAccessLog)
if err != nil {
return nil, errors.Wrapf(err, "could not marshall %T", fileAccessLog)
}
return &envoy_accesslog.AccessLog{
Name: "envoy.access_loggers.file",
ConfigType: &envoy_accesslog.AccessLog_TypedConfig{
TypedConfig: marshaled,
},
}, nil
}

func (c *Configurer) Configure(filterChain *envoy_listener.FilterChain) error {
return listeners_v3.UpdateFilterConfig(filterChain, "envoy.filters.network.http_connection_manager", func(filterConfig proto.Message) error {
hcm, ok := filterConfig.(*envoy_hcm.HttpConnectionManager)
if ok {
accessLog, err := c.envoyAccessLog(defaultHttpAccessLogFormat)
if err != nil {
return err
}

hcm.AccessLog = append(hcm.AccessLog, accessLog)
return nil
}

tcpProxy, ok := filterConfig.(*envoy_tcp.TcpProxy)
if ok {
accessLog, err := c.envoyAccessLog(defaultNetworkAccessLogFormat)
if err != nil {
return err
}

tcpProxy.AccessLog = append(tcpProxy.AccessLog, accessLog)
return nil
}

return nil
})
}
150 changes: 147 additions & 3 deletions pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package v1alpha1

import (
"github.com/kumahq/kuma/pkg/core"
envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_plugins "github.com/kumahq/kuma/pkg/core/plugins"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
"github.com/kumahq/kuma/pkg/plugins/policies/matchers"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshaccesslog/api/v1alpha1"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/generator"
)

var _ core_plugins.PolicyPlugin = &plugin{}
var log = core.Log.WithName("MeshAccessLog")

type plugin struct {
}
Expand All @@ -24,7 +29,146 @@ func (p plugin) MatchedPolicies(dataplane *core_mesh.DataplaneResource, resource
return matchers.MatchedPolicies(api.MeshAccessLogType, dataplane, resources)
}

type listeners struct {
inbound map[xds.InboundListener]*envoy_listener.Listener
outbound map[mesh_proto.OutboundInterface]*envoy_listener.Listener
}

func gatherListeners(rs *core_xds.ResourceSet) listeners {
inboundListeners := map[xds.InboundListener]*envoy_listener.Listener{}
outboundListeners := map[mesh_proto.OutboundInterface]*envoy_listener.Listener{}

for _, res := range rs.Resources(envoy_resource.ListenerType) {
listener := res.Resource.(*envoy_listener.Listener)
address := listener.GetAddress().GetSocketAddress()

switch res.Origin {
case generator.OriginOutbound:
outboundListeners[mesh_proto.OutboundInterface{
DataplaneIP: address.GetAddress(),
DataplanePort: address.GetPortValue(),
}] = listener
case generator.OriginInbound:
inboundListeners[xds.InboundListener{
Address: address.GetAddress(),
Port: address.GetPortValue(),
}] = listener
default:
continue
}
}

return listeners{
outbound: outboundListeners,
inbound: inboundListeners,
}
}

func (p plugin) applyToInbounds(
policies xds.TypedMatchingPolicies, inboundListeners map[xds.InboundListener]*envoy_listener.Listener, dataplane *core_mesh.DataplaneResource,
) error {
for _, inbound := range dataplane.Spec.GetNetworking().GetInbound() {
iface := dataplane.Spec.Networking.ToInboundInterface(inbound)

listenerKey := xds.InboundListener{
Address: iface.DataplaneIP,
Port: iface.DataplanePort,
}
listener, ok := inboundListeners[listenerKey]
if !ok {
continue
}

serviceName := inbound.GetTags()[mesh_proto.ServiceTag]

var conf *api.MeshAccessLog_Conf
if computed := policies.FromRules.Rules[listenerKey].Compute(xds.Subset{{
Key: mesh_proto.ServiceTag, Value: serviceName,
}}); computed != nil {
conf = computed.(*api.MeshAccessLog_Conf)
} else {
continue
}

for _, backend := range conf.Backends {
configurer := Configurer{
Mesh: dataplane.GetMeta().GetMesh(),
TrafficDirection: envoy.TrafficDirectionInbound,
SourceService: mesh_proto.ServiceUnknown,
DestinationService: serviceName,
Backend: backend,
Dataplane: dataplane,
}

for _, chain := range listener.FilterChains {
if err := configurer.Configure(chain); err != nil {
return err
}
}
}
}
return nil
}

func (p plugin) applyToOutbounds(
policies xds.TypedMatchingPolicies, outboundListeners map[mesh_proto.OutboundInterface]*envoy_listener.Listener, dataplane *core_mesh.DataplaneResource,
) error {
sourceService := dataplane.Spec.GetIdentifyingService()

for _, outbound := range dataplane.Spec.Networking.GetOutbound() {
oface := dataplane.Spec.Networking.ToOutboundInterface(outbound)

listener, ok := outboundListeners[oface]
if !ok {
continue
}

serviceName := outbound.GetTagsIncludingLegacy()[mesh_proto.ServiceTag]

var conf *api.MeshAccessLog_Conf
if computed := policies.ToRules.Rules.Compute(xds.Subset{{
Key: mesh_proto.ServiceTag, Value: serviceName,
}}); computed != nil {
conf = computed.(*api.MeshAccessLog_Conf)
} else {
continue
}

for _, backend := range conf.Backends {
configurer := Configurer{
Mesh: dataplane.GetMeta().GetMesh(),
TrafficDirection: envoy.TrafficDirectionOutbound,
SourceService: sourceService,
DestinationService: serviceName,
Backend: backend,
Dataplane: dataplane,
}

for _, chain := range listener.FilterChains {
if err := configurer.Configure(chain); err != nil {
return err
}
}
}
}

return nil
}

func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *core_xds.Proxy) error {
log.Info("apply is not implemented")
policies, ok := proxy.Policies.Dynamic[api.MeshAccessLogType]
if !ok {
return nil
}

listeners := gatherListeners(rs)

if err := p.applyToInbounds(policies, listeners.inbound, proxy.Dataplane); err != nil {
return err
}
if err := p.applyToOutbounds(policies, listeners.outbound, proxy.Dataplane); err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package v1alpha1_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestPlugin(t *testing.T) {
test.RunSpecs(t, "MeshAccessLog")
}
Loading

0 comments on commit e8b1d01

Please sign in to comment.