Skip to content

Commit

Permalink
feat: MeshAccessLog implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Sep 21, 2022
1 parent 29248a6 commit ab22ce3
Show file tree
Hide file tree
Showing 4 changed files with 504 additions and 1 deletion.
161 changes: 161 additions & 0 deletions pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/access_log_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package v1alpha1

import (
"fmt"
"net"

envoy_accesslog "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
access_loggers_file "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"

core_xds "github.com/kumahq/kuma/pkg/core/xds"
accesslog "github.com/kumahq/kuma/pkg/envoy/accesslog/v3"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshaccesslog/api/v1alpha1"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
"github.com/kumahq/kuma/pkg/xds/envoy"
listeners_v3 "github.com/kumahq/kuma/pkg/xds/envoy/listeners/v3"
)

const (
defaultNetworkAccessLogFormat = `[%START_TIME%] %RESPONSE_FLAGS% %KUMA_MESH% %KUMA_SOURCE_ADDRESS_WITHOUT_PORT%(%KUMA_SOURCE_SERVICE%)->%UPSTREAM_HOST%(%KUMA_DESTINATION_SERVICE%) took %DURATION%ms, sent %BYTES_SENT% bytes, received: %BYTES_RECEIVED% bytes`
defaultHttpAccessLogFormat = `[%START_TIME%] %KUMA_MESH% "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-B3-TRACEID?X-DATADOG-TRACEID)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%KUMA_SOURCE_SERVICE%" "%KUMA_DESTINATION_SERVICE%" "%KUMA_SOURCE_ADDRESS_WITHOUT_PORT%" "%UPSTREAM_HOST%"`
)

type Configurer struct {
Mesh string
TrafficDirection envoy.TrafficDirection
SourceService string
DestinationService string
Backend *api.MeshAccessLog_Backend
Proxy *core_xds.Proxy
}

func (c *Configurer) handlePlain(formatString string) (*accesslog.AccessLogFormat, error) {
envoyFormat, err := accesslog.ParseFormat(formatString + "\n")

if err != nil {
return nil, errors.Wrapf(err, "invalid access log format string: %s", formatString)
}

variables := accesslog.InterpolationVariables{
accesslog.CMD_KUMA_SOURCE_ADDRESS: net.JoinHostPort(c.Proxy.Dataplane.GetIP(), "0"), // deprecated variable
accesslog.CMD_KUMA_SOURCE_ADDRESS_WITHOUT_PORT: c.Proxy.Dataplane.GetIP(), // replacement variable
accesslog.CMD_KUMA_SOURCE_SERVICE: c.SourceService,
accesslog.CMD_KUMA_DESTINATION_SERVICE: c.DestinationService,
accesslog.CMD_KUMA_MESH: c.Mesh,
accesslog.CMD_KUMA_TRAFFIC_DIRECTION: string(c.TrafficDirection),
}

envoyFormat, err = envoyFormat.Interpolate(variables)
if err != nil {
return nil, errors.Wrapf(err, "failed to interpolate access log format string with Kuma-specific variables: %s", formatString)
}

return envoyFormat, nil
}

func (c *Configurer) envoyAccessLog(defaultFormat string) (*envoy_accesslog.AccessLog, error) {
var format *api.MeshAccessLog_Format
if f := c.Backend.GetFile().GetFormat(); f != nil {
format = f
} else if f := c.Backend.GetTcp().GetFormat(); f != nil {
format = f
}

// TODO json

formatString := format.GetPlain()
if formatString == "" {
formatString = defaultFormat
}

envoyFormat, err := accesslog.ParseFormat(formatString + "\n")

if err != nil {
return nil, errors.Wrapf(err, "invalid access log format string: %s", formatString)
}

variables := accesslog.InterpolationVariables{
accesslog.CMD_KUMA_SOURCE_ADDRESS: net.JoinHostPort(c.Proxy.Dataplane.GetIP(), "0"), // deprecated variable
accesslog.CMD_KUMA_SOURCE_ADDRESS_WITHOUT_PORT: c.Proxy.Dataplane.GetIP(), // replacement variable
accesslog.CMD_KUMA_SOURCE_SERVICE: c.SourceService,
accesslog.CMD_KUMA_DESTINATION_SERVICE: c.DestinationService,
accesslog.CMD_KUMA_MESH: c.Mesh,
accesslog.CMD_KUMA_TRAFFIC_DIRECTION: string(c.TrafficDirection),
}

envoyFormat, err = envoyFormat.Interpolate(variables)
if err != nil {
return nil, errors.Wrapf(err, "failed to interpolate access log format string with Kuma-specific variables: %s", formatString)
}

if file := c.Backend.GetFile(); file != nil {
return fileAccessLog(envoyFormat.String(), file.Path)
} else if tcp := c.Backend.GetTcp(); tcp != nil {
path := envoy.AccessLogSocketName(c.Proxy.Dataplane.Meta.GetName(), c.Mesh)
return fileAccessLog(fmt.Sprintf("%s;%s", tcp.Address, format.String()), path)
}

panic("impossible backend type")
}

func fileAccessLog(format string, path string) (*envoy_accesslog.AccessLog, error) {
fileAccessLog := &access_loggers_file.FileAccessLog{
AccessLogFormat: &access_loggers_file.FileAccessLog_LogFormat{
LogFormat: &envoy_core.SubstitutionFormatString{
Format: &envoy_core.SubstitutionFormatString_TextFormatSource{
TextFormatSource: &envoy_core.DataSource{
Specifier: &envoy_core.DataSource_InlineString{
InlineString: format,
},
},
},
},
},
Path: path,
}

marshaled, err := util_proto.MarshalAnyDeterministic(fileAccessLog)
if err != nil {
return nil, errors.Wrapf(err, "could not marshall %T", fileAccessLog)
}
return &envoy_accesslog.AccessLog{
Name: "envoy.access_loggers.file",
ConfigType: &envoy_accesslog.AccessLog_TypedConfig{
TypedConfig: marshaled,
},
}, nil
}

func (c *Configurer) Configure(filterChain *envoy_listener.FilterChain) error {
return listeners_v3.UpdateFilterConfig(filterChain, "envoy.filters.network.http_connection_manager", func(filterConfig proto.Message) error {
hcm, ok := filterConfig.(*envoy_hcm.HttpConnectionManager)
if ok {
accessLog, err := c.envoyAccessLog(defaultHttpAccessLogFormat)
if err != nil {
return err
}

hcm.AccessLog = append(hcm.AccessLog, accessLog)
return nil
}

tcpProxy, ok := filterConfig.(*envoy_tcp.TcpProxy)
if ok {
accessLog, err := c.envoyAccessLog(defaultNetworkAccessLogFormat)
if err != nil {
return err
}

tcpProxy.AccessLog = append(tcpProxy.AccessLog, accessLog)
return nil
}

return nil
})
}
78 changes: 77 additions & 1 deletion pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package v1alpha1

import (
envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
core_plugins "github.com/kumahq/kuma/pkg/core/plugins"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/xds"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
"github.com/kumahq/kuma/pkg/plugins/policies/matchers"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshaccesslog/api/v1alpha1"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/envoy"
"github.com/kumahq/kuma/pkg/xds/generator"
)

var _ core_plugins.PolicyPlugin = &plugin{}
Expand All @@ -24,7 +31,76 @@ func (p plugin) MatchedPolicies(dataplane *core_mesh.DataplaneResource, resource
return matchers.MatchedPolicies(api.MeshAccessLogType, dataplane, resources)
}

func (p plugin) applyToOutbounds(rs *core_xds.ResourceSet, proxy *core_xds.Proxy) error {
logPolicies, ok := proxy.Policies.Dynamic[api.MeshAccessLogType]
if !ok {
return nil
}

sourceService := proxy.Dataplane.Spec.GetIdentifyingService()

outboundListeners := map[mesh_proto.OutboundInterface]*envoy_listener.Listener{}
for _, res := range rs.Resources(envoy_resource.ListenerType) {
if res.Origin != generator.OriginOutbound {
continue
}
listener := res.Resource.(*envoy_listener.Listener)

address := listener.GetAddress().GetSocketAddress()

outboundListeners[mesh_proto.OutboundInterface{
DataplaneIP: address.GetAddress(),
DataplanePort: address.GetPortValue(),
}] = listener
}

for _, outbound := range proxy.Dataplane.Spec.Networking.GetOutbound() {
oface := proxy.Dataplane.Spec.Networking.ToOutboundInterface(outbound)

listener, ok := outboundListeners[oface]
if !ok {
continue
}

serviceName := outbound.GetTagsIncludingLegacy()[mesh_proto.ServiceTag]

var toPolicy *api.MeshAccessLog_Conf
for _, toRule := range logPolicies.ToRules.Rules {
if toRule.Subset.IsSubset([]xds.Tag{{
Key: mesh_proto.ServiceTag,
Value: serviceName,
}}) {
toPolicy = toRule.Conf.(*api.MeshAccessLog_Conf)
break
}
}
if toPolicy == nil {
continue
}

for _, backend := range toPolicy.Backends {
configurer := Configurer{
Mesh: proxy.Id.ToResourceKey().Mesh,
TrafficDirection: envoy.TrafficDirectionOutbound,
SourceService: sourceService,
DestinationService: serviceName,
Backend: backend,
Proxy: proxy,
}

for _, chain := range listener.FilterChains {
configurer.Configure(chain)
}
}
}

return nil
}

func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *core_xds.Proxy) error {
log.Info("apply is not implemented")
if err := p.applyToOutbounds(rs, proxy); err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package v1alpha1_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestPlugin(t *testing.T) {
test.RunSpecs(t, "MeshAccessLog")
}
Loading

0 comments on commit ab22ce3

Please sign in to comment.