Skip to content

Commit

Permalink
Refactor metadata generator to support adding metadata across resourc…
Browse files Browse the repository at this point in the history
…es (#14875)

* Refactor metagen to allow multiple resources to be enriched
  • Loading branch information
vjsamuel authored and Carlos Pérez-Aradros Herce committed Jan 14, 2020
1 parent 0fd2250 commit dba8f74
Show file tree
Hide file tree
Showing 33 changed files with 2,079 additions and 743 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407]
- Libbeat: Cleanup the x-pack licenser code to use the new license endpoint and the new format. {pull}15091[15091]
- Users can now specify `monitoring.cloud.*` to override `monitoring.elasticsearch.*` settings. {issue}14399[14399] {pull}15254[15254]
- Refactor metadata generator to support adding metadata across resources {pull}14875[14875]
- Update to ECS 1.4.0. {pull}14844[14844]

*Auditbeat*
Expand Down
4 changes: 4 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand All @@ -48,6 +50,8 @@ type Config struct {
Builders []*common.Config `config:"builders"`
Appenders []*common.Config `config:"appenders"`
Templates template.MapperSettings `config:"templates"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
}

func defaultConfig() *Config {
Expand Down
20 changes: 6 additions & 14 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,26 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type node struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
}

// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.node")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -70,7 +66,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
})
}, nil)

if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
Expand All @@ -80,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
metagen: metadata.NewNodeMetadataGenerator(cfg, watcher.Store()),
logger: logger,
watcher: watcher,
}
Expand Down Expand Up @@ -172,11 +168,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
}

eventID := fmt.Sprint(node.GetObjectMeta().GetUID())
meta := n.metagen.ResourceMetadata(node)

// TODO: Refactor metagen to make sure that this is seamless
meta.Put("node.name", node.Name)
meta.Put("node.uid", string(node.GetObjectMeta().GetUID()))
meta := n.metagen.Generate(node)

kubemeta := meta.Clone()
// Pass annotations to all events so that it can be used in templating and by annotation builders.
Expand Down
18 changes: 12 additions & 6 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,6 +116,11 @@ func TestEmitEvent_Node(t *testing.T) {
nodeIP := "192.168.0.1"
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
UUID, err := uuid.NewV4()

typeMeta := metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
}
if err != nil {
t.Fatal(err)
}
Expand All @@ -134,6 +141,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Expand Down Expand Up @@ -180,7 +188,8 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: v1.NodeStatus{},
TypeMeta: typeMeta,
Status: v1.NodeStatus{},
},
Expected: nil,
},
Expand All @@ -194,6 +203,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{},
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -236,11 +246,7 @@ func TestEmitEvent_Node(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
95 changes: 74 additions & 21 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,31 @@ import (
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/libbeat/autodiscover/builder"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type pod struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
}

// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.pod")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
})
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err)
}

var nodeMeta, namespaceMeta metadata.MetaGen
var nodeWatcher, namespaceWatcher kubernetes.Watcher
metaConf := config.AddResourceMetadata
if metaConf != nil {
if metaConf.Node != nil && metaConf.Node.Enabled() {
options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
}
if config.Namespace != "" {
options.Namespace = config.Namespace
}
nodeWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}

nodeMeta = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store())
}

if metaConf.Namespace != nil && metaConf.Namespace.Enabled() {
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}

namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store())
}
}

p := &pod{
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
logger: logger,
watcher: watcher,
config: config,
uuid: uuid,
publish: publish,
metagen: metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nodeMeta, namespaceMeta),
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
}

watcher.AddEventHandler(p)
Expand Down Expand Up @@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {

// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}

if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}

return p.watcher.Start()
}

// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()

if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}

if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
}

func (p *pod) emit(pod *kubernetes.Pod, flag string) {
Expand Down Expand Up @@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
"image": c.Image,
"runtime": runtimes[c.Name],
}
meta := p.metagen.ContainerMetadata(pod, c.Name, c.Image)
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name),
metadata.WithFields("container.image", c.Image))

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
Expand Down
18 changes: 13 additions & 5 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -175,6 +177,11 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

typeMeta := metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
}

tests := []struct {
Message string
Flag string
Expand All @@ -192,6 +199,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -264,6 +272,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -295,6 +304,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -326,6 +336,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -393,6 +404,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -459,11 +471,7 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
Loading

0 comments on commit dba8f74

Please sign in to comment.